Updated Branches: refs/heads/trunk ecda83471 -> 9cf915fd4
parallelize fetching rows for low-cardinality indexes patch by David Alves; reviewed by Vijay for CASSANDRA-1337 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9cf915fd Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9cf915fd Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9cf915fd Branch: refs/heads/trunk Commit: 9cf915fd464b6f8a6aa9d54a762ad8796872681a Parents: ecda834 Author: Vijay Parthasarathy <vijay2...@gmail.com> Authored: Thu Jun 21 21:20:09 2012 -0700 Committer: Vijay Parthasarathy <vijay2...@gmail.com> Committed: Thu Jun 21 21:20:09 2012 -0700 ---------------------------------------------------------------------- .../org/apache/cassandra/service/StorageProxy.java | 66 ++++++++++----- 1 files changed, 45 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cf915fd/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 92a3256..4353a8f 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -845,6 +845,22 @@ public class StorageProxy implements StorageProxyMBean int columnsCount = 0; rows = new ArrayList<Row>(); List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.range); + + // get the cardinality of this index based on row count + // use this info to decide how many scans to do in parallel + long estimatedKeys = Table.open(command.keyspace).getColumnFamilyStore(command.column_family) + .estimateKeys(); + int concurrencyFactor = (int) command.maxResults / ((int) estimatedKeys + 1); + + if (concurrencyFactor <= 0) + concurrencyFactor = 1; + + if (concurrencyFactor > ranges.size()) + concurrencyFactor = ranges.size(); + + // parallel scan handlers + List<ReadCallback<RangeSliceReply, Iterable<Row>>> scanHandlers = new ArrayList<ReadCallback<RangeSliceReply, Iterable<Row>>>(concurrencyFactor); + for (AbstractBounds<RowPosition> range : ranges) { RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace, @@ -895,32 +911,40 @@ public class StorageProxy implements StorageProxyMBean logger.debug("reading " + nodeCmd + " from " + endpoint); } - try + scanHandlers.add(handler); + if (scanHandlers.size() >= concurrencyFactor) { - for (Row row : handler.get()) + for (ReadCallback<RangeSliceReply, Iterable<Row>> scanHandler : scanHandlers) { - rows.add(row); - columnsCount += row.getLiveColumnCount(); - logger.debug("range slices read {}", row.key); + try + { + for (Row row : scanHandler.get()) + { + rows.add(row); + columnsCount += row.getLiveColumnCount(); + logger.debug("range slices read {}", row.key); + } + FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getReadRpcTimeout()); + } + catch (TimeoutException ex) + { + if (logger.isDebugEnabled()) + logger.debug("Range slice timeout: {}", ex.toString()); + throw ex; + } + catch (DigestMismatchException e) + { + throw new AssertionError(e); // no digests in range slices yet + } + + // if we're done, great, otherwise, move to the next range + int count = nodeCmd.maxIsColumns ? columnsCount : rows.size(); + if (count >= nodeCmd.maxResults) + break; } - FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout()); - } - catch (TimeoutException ex) - { - if (logger.isDebugEnabled()) - logger.debug("Range slice timeout: {}", ex.toString()); - throw ex; - } - catch (DigestMismatchException e) - { - throw new AssertionError(e); // no digests in range slices yet + scanHandlers.clear(); //go back for more } } - - // if we're done, great, otherwise, move to the next range - int count = nodeCmd.maxIsColumns ? columnsCount : rows.size(); - if (count >= nodeCmd.maxResults) - break; } } finally