Updated Branches: refs/heads/trunk db59808bf -> 0456b7eb2
Allow doing a range slice with a limit of columns instead of rows patch by slebresne; reviewed by jbellis for CASSANDRA-3742 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0456b7eb Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0456b7eb Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0456b7eb Branch: refs/heads/trunk Commit: 0456b7eb231275fba368d35fb861b09096fdd8fb Parents: db59808 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Tue Jan 17 09:40:04 2012 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Tue Jan 17 09:41:45 2012 +0100 ---------------------------------------------------------------------- CHANGES.txt | 2 + src/java/org/apache/cassandra/db/ColumnFamily.java | 4 - .../org/apache/cassandra/db/ColumnFamilyStore.java | 21 +++++- .../org/apache/cassandra/db/RangeSliceCommand.java | 49 ++++++++++--- src/java/org/apache/cassandra/db/Row.java | 5 + .../apache/cassandra/db/filter/ExtendedFilter.java | 45 +++++++++-- .../org/apache/cassandra/db/filter/IFilter.java | 1 + .../cassandra/db/filter/NamesQueryFilter.java | 4 + .../cassandra/db/filter/SliceQueryFilter.java | 10 ++- .../cassandra/db/index/SecondaryIndexManager.java | 4 +- .../cassandra/db/index/SecondaryIndexSearcher.java | 2 +- .../cassandra/db/index/keys/KeysSearcher.java | 15 ++-- .../cassandra/service/IndexScanVerbHandler.java | 8 +- .../cassandra/service/RangeSliceVerbHandler.java | 4 +- .../org/apache/cassandra/service/StorageProxy.java | 23 +++++- .../apache/cassandra/db/ColumnFamilyStoreTest.java | 58 ++++++++++++++- 16 files changed, 204 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 704630e..68147d7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -43,6 +43,8 @@ * Fix ClassCastException during hinted handoff (CASSANDRA-3694) * Upgrade Thrift to 0.7 (CASSANDRA-3213) * Make stress.java insert operation to use microseconds (CASSANDRA-3725) + * Allows (internally) doing a range query with a limit of columns instead of + rows (CASSANDRA-3742) 1.0.7 http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java index af089cb..ee92c44 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamily.java +++ b/src/java/org/apache/cassandra/db/ColumnFamily.java @@ -124,10 +124,6 @@ public class ColumnFamily extends AbstractColumnContainer return cfm; } - /** - * FIXME: shouldn't need to hold a reference to a serializer; worse, for super cfs, - * it will be a _unique_ serializer object per row - */ public IColumnSerializer getColumnSerializer() { return cfm.getColumnSerializer(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 2cc2afe..805c55c 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1290,20 +1290,31 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public List<Row> getRangeSlice(ByteBuffer superColumn, final AbstractBounds<RowPosition> range, int maxResults, IFilter columnFilter, List<IndexExpression> rowFilter) { - return filter(getSequentialIterator(superColumn, range, columnFilter), ExtendedFilter.create(this, columnFilter, rowFilter, maxResults)); + return getRangeSlice(superColumn, range, maxResults, columnFilter, rowFilter, false); + } + + public List<Row> getRangeSlice(ByteBuffer superColumn, final AbstractBounds<RowPosition> range, int maxResults, IFilter columnFilter, List<IndexExpression> rowFilter, boolean maxIsColumns) + { + return filter(getSequentialIterator(superColumn, range, columnFilter), ExtendedFilter.create(this, columnFilter, rowFilter, maxResults, maxIsColumns)); } public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter) { - return indexManager.search(clause, range, maxResults, dataFilter); + return search(clause, range, maxResults, dataFilter, false); + } + + public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter, boolean maxIsColumns) + { + return indexManager.search(clause, range, maxResults, dataFilter, maxIsColumns); } public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter) { List<Row> rows = new ArrayList<Row>(); + int columnsCount = 0; try { - while (rowIterator.hasNext() && rows.size() < filter.maxResults) + while (rowIterator.hasNext() && rows.size() < filter.maxRows() && columnsCount < filter.maxColumns()) { // get the raw columns requested, and additional columns for the expressions if necessary Row rawRow = rowIterator.next(); @@ -1326,6 +1337,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // cut the resultset back to what was requested, if necessary data = filter.prune(data); rows.add(new Row(rawRow.key, data)); + if (data != null) + columnsCount += data.getLiveColumnCount(); + // Update the underlying filter to avoid querying more columns per slice than necessary + filter.updateColumnsLimit(columnsCount); } return rows; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/RangeSliceCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java index 4ab821e..d7310b4 100644 --- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java +++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java @@ -74,27 +74,44 @@ public class RangeSliceCommand implements MessageProducer, IReadCommand public final List<IndexExpression> row_filter; public final AbstractBounds<RowPosition> range; - public final int max_keys; + public final int maxResults; + public final boolean maxIsColumns; - public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, int max_keys) + public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, int maxResults) { - this(keyspace, column_family, super_column, predicate, range, null, max_keys); + this(keyspace, column_family, super_column, predicate, range, null, maxResults, false); } - public RangeSliceCommand(String keyspace, ColumnParent column_parent, SlicePredicate predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int max_keys) + public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, int maxResults, boolean maxIsColumns) { - this(keyspace, column_parent.getColumn_family(), column_parent.super_column, predicate, range, row_filter, max_keys); + this(keyspace, column_family, super_column, predicate, range, null, maxResults, maxIsColumns); } - public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int max_keys) + public RangeSliceCommand(String keyspace, ColumnParent column_parent, SlicePredicate predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults) + { + this(keyspace, column_parent.getColumn_family(), column_parent.super_column, predicate, range, row_filter, maxResults, false); + } + + public RangeSliceCommand(String keyspace, ColumnParent column_parent, SlicePredicate predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults, boolean maxIsColumns) + { + this(keyspace, column_parent.getColumn_family(), column_parent.super_column, predicate, range, row_filter, maxResults, maxIsColumns); + } + + public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults) + { + this(keyspace, column_family, super_column, predicate, range, row_filter, maxResults, false); + } + + public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults, boolean maxIsColumns) { this.keyspace = keyspace; this.column_family = column_family; this.super_column = super_column; this.predicate = predicate; this.range = range; - this.max_keys = max_keys; this.row_filter = row_filter; + this.maxResults = maxResults; + this.maxIsColumns = maxIsColumns; } public Message getMessage(Integer version) throws IOException @@ -115,8 +132,9 @@ public class RangeSliceCommand implements MessageProducer, IReadCommand ", super_column=" + super_column + ", predicate=" + predicate + ", range=" + range + - ", max_keys=" + max_keys + ", row_filter =" + row_filter + + ", maxResults=" + maxResults + + ", maxIsColumns=" + maxIsColumns + '}'; } @@ -161,7 +179,11 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm } } AbstractBounds.serializer().serialize(sliceCommand.range, dos, version); - dos.writeInt(sliceCommand.max_keys); + dos.writeInt(sliceCommand.maxResults); + if (version >= MessagingService.VERSION_11) + { + dos.writeBoolean(sliceCommand.maxIsColumns); + } } public RangeSliceCommand deserialize(DataInput dis, int version) throws IOException @@ -196,8 +218,13 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm } AbstractBounds<RowPosition> range = AbstractBounds.serializer().deserialize(dis, version).toRowBounds(); - int maxKeys = dis.readInt(); - return new RangeSliceCommand(keyspace, columnFamily, superColumn, pred, range, rowFilter, maxKeys); + int maxResults = dis.readInt(); + boolean maxIsColumns = false; + if (version >= MessagingService.VERSION_11) + { + maxIsColumns = dis.readBoolean(); + } + return new RangeSliceCommand(keyspace, columnFamily, superColumn, pred, range, rowFilter, maxResults, maxIsColumns); } public long serializedSize(RangeSliceCommand rangeSliceCommand, int version) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/Row.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Row.java b/src/java/org/apache/cassandra/db/Row.java index 81cfc7b..8856acd 100644 --- a/src/java/org/apache/cassandra/db/Row.java +++ b/src/java/org/apache/cassandra/db/Row.java @@ -45,6 +45,11 @@ public class Row this.cf = cf; } + public int getLiveColumnCount() + { + return cf == null ? 0 : cf.getLiveColumnCount(); + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java index bf56e99..205d3c8 100644 --- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java @@ -23,24 +23,51 @@ public abstract class ExtendedFilter private static Logger logger = LoggerFactory.getLogger(ExtendedFilter.class); public final ColumnFamilyStore cfs; - public final int maxResults; protected final IFilter originalFilter; + private final int maxResults; + private final boolean maxIsColumns; - public static ExtendedFilter create(ColumnFamilyStore cfs, IFilter filter, List<IndexExpression> clause, int maxResults) + public static ExtendedFilter create(ColumnFamilyStore cfs, IFilter filter, List<IndexExpression> clause, int maxResults, boolean maxIsColumns) { if (clause == null || clause.isEmpty()) - return new EmptyClauseFilter(cfs, filter, maxResults); + return new EmptyClauseFilter(cfs, filter, maxResults, maxIsColumns); else - return new FilterWithClauses(cfs, filter, clause, maxResults); + return new FilterWithClauses(cfs, filter, clause, maxResults, maxIsColumns); } - protected ExtendedFilter(ColumnFamilyStore cfs, IFilter filter, int maxResults) + protected ExtendedFilter(ColumnFamilyStore cfs, IFilter filter, int maxResults, boolean maxIsColumns) { assert cfs != null; assert filter != null; this.cfs = cfs; this.originalFilter = filter; this.maxResults = maxResults; + this.maxIsColumns = maxIsColumns; + if (maxIsColumns) + originalFilter.updateColumnsLimit(maxResults); + } + + public int maxRows() + { + return maxIsColumns ? Integer.MAX_VALUE : maxResults; + } + + public int maxColumns() + { + return maxIsColumns ? maxResults : Integer.MAX_VALUE; + } + + /** + * Update the filter if necessary given the number of column already + * fetched. + */ + public void updateColumnsLimit(int columnsCount) + { + if (!maxIsColumns) + return; + + int remaining = maxResults - columnsCount; + initialFilter().updateColumnsLimit(remaining); } /** The initial filter we'll do our first slice with (either the original or a superset of it) */ @@ -90,9 +117,9 @@ public abstract class ExtendedFilter protected final List<IndexExpression> clause; protected final IFilter initialFilter; - public FilterWithClauses(ColumnFamilyStore cfs, IFilter filter, List<IndexExpression> clause, int maxResults) + public FilterWithClauses(ColumnFamilyStore cfs, IFilter filter, List<IndexExpression> clause, int maxResults, boolean maxIsColumns) { - super(cfs, filter, maxResults); + super(cfs, filter, maxResults, maxIsColumns); assert clause != null; this.clause = clause; this.initialFilter = computeInitialFilter(); @@ -217,9 +244,9 @@ public abstract class ExtendedFilter private static class EmptyClauseFilter extends ExtendedFilter { - public EmptyClauseFilter(ColumnFamilyStore cfs, IFilter filter, int maxResults) + public EmptyClauseFilter(ColumnFamilyStore cfs, IFilter filter, int maxResults, boolean maxIsColumns) { - super(cfs, filter, maxResults); + super(cfs, filter, maxResults, maxIsColumns); } public IFilter initialFilter() http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/filter/IFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/IFilter.java b/src/java/org/apache/cassandra/db/filter/IFilter.java index c3a0d05..e226ec6 100644 --- a/src/java/org/apache/cassandra/db/filter/IFilter.java +++ b/src/java/org/apache/cassandra/db/filter/IFilter.java @@ -75,4 +75,5 @@ public interface IFilter public Comparator<IColumn> getColumnComparator(AbstractType comparator); public boolean isReversed(); + public void updateColumnsLimit(int newLimit); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java index 9a44aaf..3403727 100644 --- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java @@ -104,4 +104,8 @@ public class NamesQueryFilter implements IFilter { return false; } + + public void updateColumnsLimit(int newLimit) + { + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java index d332134..efe9ec2 100644 --- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java @@ -43,10 +43,9 @@ public class SliceQueryFilter implements IFilter { private static Logger logger = LoggerFactory.getLogger(SliceQueryFilter.class); - public final ByteBuffer start; - public final ByteBuffer finish; + public final ByteBuffer start; public final ByteBuffer finish; public final boolean reversed; - public final int count; + public volatile int count; public SliceQueryFilter(ByteBuffer start, ByteBuffer finish, boolean reversed, int count) { @@ -155,4 +154,9 @@ public class SliceQueryFilter implements IFilter { return reversed; } + + public void updateColumnsLimit(int newLimit) + { + count = newLimit; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java index 01b795f..12f9d4c 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java @@ -550,7 +550,7 @@ public class SecondaryIndexManager * @param dataFilter the column range to restrict to * @return found indexed rows */ - public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter) + public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter, boolean maxIsColumns) { List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause); @@ -562,6 +562,6 @@ public class SecondaryIndexManager throw new RuntimeException("Unable to search across multiple secondary index types"); - return indexSearchers.get(0).search(clause, range, maxResults, dataFilter); + return indexSearchers.get(0).search(clause, range, maxResults, dataFilter, maxIsColumns); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java index 6365c81..a74b0de 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java @@ -40,7 +40,7 @@ public abstract class SecondaryIndexSearcher this.baseCfs = indexManager.baseCfs; } - public abstract List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter); + public abstract List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter, boolean maxIsColumns); /** * @return true this index is able to handle given clauses. http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java index 4f975eb..7237a87 100644 --- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java +++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java @@ -84,10 +84,10 @@ public class KeysSearcher extends SecondaryIndexSearcher } @Override - public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter) + public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter, boolean maxIsColumns) { assert clause != null && !clause.isEmpty(); - ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults); + ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, maxIsColumns); return baseCfs.filter(getIndexedIterator(range, filter), filter); } @@ -121,13 +121,16 @@ public class KeysSearcher extends SecondaryIndexSearcher protected Row computeNext() { + int meanColumns = Math.max(index.getIndexCfs().getMeanColumns(), 1); + // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses + int rowsPerQuery = Math.max(Math.min(filter.maxRows(), filter.maxColumns() / meanColumns), 2); while (true) { if (indexColumns == null || !indexColumns.hasNext()) { - if (columnsRead < filter.maxResults) + if (columnsRead < rowsPerQuery) { - logger.debug("Read only {} (< {}) last page through, must be done", columnsRead, filter.maxResults); + logger.debug("Read only {} (< {}) last page through, must be done", columnsRead, rowsPerQuery); return endOfData(); } @@ -135,14 +138,12 @@ public class KeysSearcher extends SecondaryIndexSearcher logger.debug(String.format("Scanning index %s starting with %s", expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey))); - // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses - int count = Math.max(filter.maxResults, 2); QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey, new QueryPath(index.getIndexCfs().getColumnFamilyName()), lastSeenKey, endKey, false, - count); + rowsPerQuery); ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter); logger.debug("fetched {}", indexRow); if (indexRow == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java index 066b2e4..a5a1b20 100644 --- a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java +++ b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java @@ -39,10 +39,10 @@ public class IndexScanVerbHandler implements IVerbHandler { IndexScanCommand command = IndexScanCommand.read(message); ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family); - List<Row> rows = cfs.indexManager.search(command.index_clause.expressions, - command.range, - command.index_clause.count, - QueryFilter.getFilter(command.predicate, cfs.getComparator())); + List<Row> rows = cfs.search(command.index_clause.expressions, + command.range, + command.index_clause.count, + QueryFilter.getFilter(command.predicate, cfs.getComparator())); RangeSliceReply reply = new RangeSliceReply(rows); Message response = reply.getReply(message); if (logger.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java index 2353b71..76823de 100644 --- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java +++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java @@ -45,9 +45,9 @@ public class RangeSliceVerbHandler implements IVerbHandler IFilter columnFilter = QueryFilter.getFilter(command.predicate, cfs.getComparator()); if (cfs.indexManager.hasIndexFor(command.row_filter)) - return cfs.search(command.row_filter, command.range, command.max_keys, columnFilter); + return cfs.search(command.row_filter, command.range, command.maxResults, columnFilter, command.maxIsColumns); else - return cfs.getRangeSlice(command.super_column, command.range, command.max_keys, columnFilter, command.row_filter); + return cfs.getRangeSlice(command.super_column, command.range, command.maxResults, columnFilter, command.row_filter, command.maxIsColumns); } public void doVerb(Message message, String id) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 636e15d..4137632 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -826,7 +826,8 @@ public class StorageProxy implements StorageProxyMBean // now scan until we have enough results try { - rows = new ArrayList<Row>(command.max_keys); + int columnsCount = 0; + rows = new ArrayList<Row>(); List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.range); for (AbstractBounds<RowPosition> range : ranges) { @@ -836,7 +837,8 @@ public class StorageProxy implements StorageProxyMBean command.predicate, range, command.row_filter, - command.max_keys); + command.maxResults, + command.maxIsColumns); List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(nodeCmd.keyspace, range.right); DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints); @@ -849,6 +851,8 @@ public class StorageProxy implements StorageProxyMBean try { rows.addAll(RangeSliceVerbHandler.executeLocally(nodeCmd)); + for (Row row : rows) + columnsCount += row.getLiveColumnCount(); } catch (ExecutionException e) { @@ -877,6 +881,7 @@ public class StorageProxy implements StorageProxyMBean for (Row row : handler.get()) { rows.add(row); + columnsCount += row.getLiveColumnCount(); logger.debug("range slices read {}", row.key); } FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getRpcTimeout()); @@ -894,7 +899,8 @@ public class StorageProxy implements StorageProxyMBean } // if we're done, great, otherwise, move to the next range - if (rows.size() >= nodeCmd.max_keys) + int count = nodeCmd.maxIsColumns ? columnsCount : rows.size(); + if (count >= nodeCmd.maxResults) break; } } @@ -902,7 +908,16 @@ public class StorageProxy implements StorageProxyMBean { rangeStats.addNano(System.nanoTime() - startTime); } - return rows.size() > command.max_keys ? rows.subList(0, command.max_keys) : rows; + return trim(command, rows); + } + + private static List<Row> trim(RangeSliceCommand command, List<Row> rows) + { + // When maxIsColumns, we let the caller trim the result. + if (command.maxIsColumns) + return rows; + else + return rows.size() > command.maxResults ? rows.subList(0, command.maxResults) : rows; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index c5c4273..d2b0a0a 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -193,7 +193,7 @@ public class ColumnFamilyStoreTest extends CleanupHelper IFilter filter = new IdentityQueryFilter(); IPartitioner p = StorageService.getPartitioner(); Range<RowPosition> range = Util.range("", ""); - List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").indexManager.search(clause, range, 100, filter); + List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(clause, range, 100, filter); assert rows != null; assert rows.size() == 2 : StringUtils.join(rows, ","); @@ -786,4 +786,60 @@ public class ColumnFamilyStoreTest extends CleanupHelper Column column = (Column) cf.getColumn(cname); assert column.value().equals(ByteBufferUtil.bytes("a")) : "expecting a, got " + ByteBufferUtil.string(column.value()); } + + private static void assertTotalColCount(Collection<Row> rows, int expectedCount) throws CharacterCodingException + { + int columns = 0; + for (Row row : rows) + { + columns += row.getLiveColumnCount(); + } + assert columns == expectedCount : "Expected " + expectedCount + " live columns but got " + columns + ": " + rows; + } + + @Test + public void testRangeSliceColumnsLimit() throws Throwable + { + String tableName = "Keyspace1"; + String cfName = "Standard1"; + Table table = Table.open(tableName); + ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName); + cfs.clearUnsafe(); + + Column[] cols = new Column[5]; + for (int i = 0; i < 5; i++) + cols[i] = column("c" + i, "value", 1); + + putColsStandard(cfs, Util.dk("a"), cols[0], cols[1], cols[2], cols[3], cols[4]); + putColsStandard(cfs, Util.dk("b"), cols[0], cols[1]); + putColsStandard(cfs, Util.dk("c"), cols[0], cols[1], cols[2], cols[3]); + cfs.forceBlockingFlush(); + + SlicePredicate sp = new SlicePredicate(); + sp.setSlice_range(new SliceRange()); + sp.getSlice_range().setCount(1); + sp.getSlice_range().setStart(ArrayUtils.EMPTY_BYTE_ARRAY); + sp.getSlice_range().setFinish(ArrayUtils.EMPTY_BYTE_ARRAY); + + assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 3, QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 3); + assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 5, QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 5); + assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 8, QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 8); + assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 10, QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 10); + assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 100, QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 11); + + // Check that when querying by name, we always include all names for a + // gien row even if it means returning more columns than requested (this is necesseray for CQL) + sp = new SlicePredicate(); + sp.setColumn_names(Arrays.asList( + ByteBufferUtil.bytes("c0"), + ByteBufferUtil.bytes("c1"), + ByteBufferUtil.bytes("c2") + )); + + assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 1, QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 3); + assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 4, QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 5); + assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 5, QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 5); + assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 6, QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 8); + assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 100, QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 8); + } }