Repository: cassandra Updated Branches: refs/heads/trunk 95638b6e2 -> afb52aa9a
Add support for top-k custom 2i queries patch by Andrés de la Peña; reviewed by Sam Tunnicliffe for CASSANDRA-8717 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4c7c5be7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4c7c5be7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4c7c5be7 Branch: refs/heads/trunk Commit: 4c7c5be798e2a7d1e72d086bc5011242ea0173dc Parents: 5bae5a3 Author: Andrés de la Peña <adelap...@stratio.com> Authored: Tue May 5 20:45:09 2015 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Tue May 5 20:46:12 2015 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/AbstractRangeCommand.java | 23 ++++++++ .../db/index/SecondaryIndexManager.java | 48 ++++++++++------ .../db/index/SecondaryIndexSearcher.java | 44 ++++++++++++--- .../db/index/composites/CompositesSearcher.java | 2 +- .../cassandra/db/index/keys/KeysSearcher.java | 2 +- .../apache/cassandra/service/StorageProxy.java | 58 ++++++++++++-------- 7 files changed, 127 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 64d0760..da14ca3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.6 + * Add support for top-k custom 2i queries (CASSANDRA-8717) * Fix error when dropping table during compaction (CASSANDRA-9251) * cassandra-stress supports validation operations over user profiles (CASSANDRA-8773) * Add support for rate limiting log messages (CASSANDRA-9029) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/db/AbstractRangeCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractRangeCommand.java b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java index b358f1b..959b524 100644 --- a/src/java/org/apache/cassandra/db/AbstractRangeCommand.java +++ b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.index.*; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.service.IReadCommand; @@ -35,6 +36,8 @@ public abstract class AbstractRangeCommand implements IReadCommand public final IDiskAtomFilter predicate; public final List<IndexExpression> rowFilter; + public final SecondaryIndexSearcher searcher; + public AbstractRangeCommand(String keyspace, String columnFamily, long timestamp, AbstractBounds<RowPosition> keyRange, IDiskAtomFilter predicate, List<IndexExpression> rowFilter) { this.keyspace = keyspace; @@ -43,6 +46,26 @@ public abstract class AbstractRangeCommand implements IReadCommand this.keyRange = keyRange; this.predicate = predicate; this.rowFilter = rowFilter; + SecondaryIndexManager indexManager = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily).indexManager; + this.searcher = indexManager.getHighestSelectivityIndexSearcher(rowFilter); + } + + public boolean requiresScanningAllRanges() + { + return searcher != null && searcher.requiresScanningAllRanges(rowFilter); + } + + public List<Row> postReconciliationProcessing(List<Row> rows) + { + return searcher == null ? trim(rows) : trim(searcher.postReconciliationProcessing(rowFilter, rows)); + } + + private List<Row> trim(List<Row> rows) + { + if (countCQL3Rows()) + return rows; + else + return rows.size() > limit() ? rows.subList(0, limit()) : rows; } public String getKeyspace() http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java index e4a9ff8..ab6df1e 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java @@ -638,25 +638,11 @@ public class SecondaryIndexManager */ public List<Row> search(ExtendedFilter filter) { - List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(filter.getClause()); - - if (indexSearchers.isEmpty()) + SecondaryIndexSearcher mostSelective = getHighestSelectivityIndexSearcher(filter.getClause()); + if (mostSelective == null) return Collections.emptyList(); - - SecondaryIndexSearcher mostSelective = null; - long bestEstimate = Long.MAX_VALUE; - for (SecondaryIndexSearcher searcher : indexSearchers) - { - SecondaryIndex highestSelectivityIndex = searcher.highestSelectivityIndex(filter.getClause()); - long estimate = highestSelectivityIndex.estimateResultRows(); - if (estimate <= bestEstimate) - { - bestEstimate = estimate; - mostSelective = searcher; - } - } - - return mostSelective.search(filter); + else + return mostSelective.search(filter); } public Set<SecondaryIndex> getIndexesByNames(Set<String> idxNames) @@ -849,4 +835,30 @@ public class SecondaryIndexManager } } + + public SecondaryIndexSearcher getHighestSelectivityIndexSearcher(List<IndexExpression> clause) + { + if (clause == null) + return null; + + List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause); + + if (indexSearchers.isEmpty()) + return null; + + SecondaryIndexSearcher mostSelective = null; + long bestEstimate = Long.MAX_VALUE; + for (SecondaryIndexSearcher searcher : indexSearchers) + { + SecondaryIndex highestSelectivityIndex = searcher.highestSelectivityIndex(clause); + long estimate = highestSelectivityIndex.estimateResultRows(); + if (estimate <= bestEstimate) + { + bestEstimate = estimate; + mostSelective = searcher; + } + } + + return mostSelective; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java index 93e0643..ab2cd75 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java @@ -43,7 +43,7 @@ public abstract class SecondaryIndexSearcher public SecondaryIndex highestSelectivityIndex(List<IndexExpression> clause) { - IndexExpression expr = highestSelectivityPredicate(clause); + IndexExpression expr = highestSelectivityPredicate(clause, false); return expr == null ? null : indexManager.getIndexForColumn(expr.column); } @@ -77,7 +77,7 @@ public abstract class SecondaryIndexSearcher { } - protected IndexExpression highestSelectivityPredicate(List<IndexExpression> clause) + protected IndexExpression highestSelectivityPredicate(List<IndexExpression> clause, boolean includeInTrace) { IndexExpression best = null; int bestMeanCount = Integer.MAX_VALUE; @@ -102,12 +102,40 @@ public abstract class SecondaryIndexSearcher } } - if (best == null) - Tracing.trace("No applicable indexes found"); - else - Tracing.trace("Candidate index mean cardinalities are {}. Scanning with {}.", - FBUtilities.toString(candidates), indexManager.getIndexForColumn(best.column).getIndexName()); - + if (includeInTrace) + { + if (best == null) + Tracing.trace("No applicable indexes found"); + else if (Tracing.isTracing()) + // pay for an additional threadlocal get() rather than build the strings unnecessarily + Tracing.trace("Candidate index mean cardinalities are {}. Scanning with {}.", + FBUtilities.toString(candidates), + indexManager.getIndexForColumn(best.column).getIndexName()); + } return best; } + + /** + * Returns {@code true} if the specified list of {@link IndexExpression}s require a full scan of all the nodes. + * + * @param clause A list of {@link IndexExpression}s + * @return {@code true} if the {@code IndexExpression}s require a full scan, {@code false} otherwise + */ + public boolean requiresScanningAllRanges(List<IndexExpression> clause) + { + return false; + } + + /** + * Combines index query results from multiple nodes. This is done by the coordinator node after it has reconciled + * the replica responses. + * + * @param clause A list of {@link IndexExpression}s + * @param rows The index query results to be combined + * @return The combination of the index query results + */ + public List<Row> postReconciliationProcessing(List<IndexExpression> clause, List<Row> rows) + { + return rows; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java index 3e523f4..a2d08e7 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java @@ -59,7 +59,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher public List<Row> search(ExtendedFilter filter) { assert filter.getClause() != null && !filter.getClause().isEmpty(); - final IndexExpression primary = highestSelectivityPredicate(filter.getClause()); + final IndexExpression primary = highestSelectivityPredicate(filter.getClause(), true); final CompositesIndex index = (CompositesIndex)indexManager.getIndexForColumn(primary.column); // TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room // as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java index 4055b7c..634bb0c 100644 --- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java +++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java @@ -53,7 +53,7 @@ public class KeysSearcher extends SecondaryIndexSearcher public List<Row> search(ExtendedFilter filter) { assert filter.getClause() != null && !filter.getClause().isEmpty(); - final IndexExpression primary = highestSelectivityPredicate(filter.getClause()); + final IndexExpression primary = highestSelectivityPredicate(filter.getClause(), true); final SecondaryIndex index = indexManager.getIndexForColumn(primary.column); // TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room // as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index b41429e..1536e46 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1621,17 +1621,39 @@ public class StorageProxy implements StorageProxyMBean else ranges = getRestrictedRanges(command.keyRange); - // our estimate of how many result rows there will be per-range - float resultRowsPerRange = estimateResultRowsPerRange(command, keyspace); - // underestimate how many rows we will get per-range in order to increase the likelihood that we'll - // fetch enough rows in the first round - resultRowsPerRange -= resultRowsPerRange * CONCURRENT_SUBREQUESTS_MARGIN; - int concurrencyFactor = resultRowsPerRange == 0.0 + // determine the number of rows to be fetched and the concurrency factor + int rowsToBeFetched = command.limit(); + int concurrencyFactor; + if (command.requiresScanningAllRanges()) + { + // all nodes must be queried + rowsToBeFetched *= ranges.size(); + concurrencyFactor = ranges.size(); + logger.debug("Requested rows: {}, ranges.size(): {}; concurrent range requests: {}", + command.limit(), + ranges.size(), + concurrencyFactor); + Tracing.trace("Submitting range requests on {} ranges with a concurrency of {}", + new Object[]{ ranges.size(), concurrencyFactor}); + } + else + { + // our estimate of how many result rows there will be per-range + float resultRowsPerRange = estimateResultRowsPerRange(command, keyspace); + // underestimate how many rows we will get per-range in order to increase the likelihood that we'll + // fetch enough rows in the first round + resultRowsPerRange -= resultRowsPerRange * CONCURRENT_SUBREQUESTS_MARGIN; + concurrencyFactor = resultRowsPerRange == 0.0 ? 1 : Math.max(1, Math.min(ranges.size(), (int) Math.ceil(command.limit() / resultRowsPerRange))); - logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}", - resultRowsPerRange, command.limit(), ranges.size(), concurrencyFactor); - Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", new Object[]{ ranges.size(), concurrencyFactor, resultRowsPerRange}); + logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}", + resultRowsPerRange, + command.limit(), + ranges.size(), + concurrencyFactor); + Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", + new Object[]{ ranges.size(), concurrencyFactor, resultRowsPerRange}); + } boolean haveSufficientRows = false; int i = 0; @@ -1723,7 +1745,6 @@ public class StorageProxy implements StorageProxyMBean List<AsyncOneResponse> repairResponses = new ArrayList<>(); for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>> cmdPairHandler : scanHandlers) { - AbstractRangeCommand nodeCmd = cmdPairHandler.left; ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right; RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver; @@ -1765,7 +1786,7 @@ public class StorageProxy implements StorageProxyMBean // if we're done, great, otherwise, move to the next range int count = countLiveRows ? liveRowCount : rows.size(); - if (count >= nodeCmd.limit()) + if (count >= rowsToBeFetched) { haveSufficientRows = true; break; @@ -1788,14 +1809,14 @@ public class StorageProxy implements StorageProxyMBean } if (haveSufficientRows) - return trim(command, rows); + return command.postReconciliationProcessing(rows); // we didn't get enough rows in our concurrent fetch; recalculate our concurrency factor // based on the results we've seen so far (as long as we still have ranges left to query) if (i < ranges.size()) { float fetchedRows = countLiveRows ? liveRowCount : rows.size(); - float remainingRows = command.limit() - fetchedRows; + float remainingRows = rowsToBeFetched - fetchedRows; float actualRowsPerRange; if (fetchedRows == 0.0) { @@ -1819,16 +1840,7 @@ public class StorageProxy implements StorageProxyMBean rangeMetrics.addNano(latency); Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS); } - return trim(command, rows); - } - - private static List<Row> trim(AbstractRangeCommand command, List<Row> rows) - { - // for CQL3 queries, let the caller trim the results - if (command.countCQL3Rows() || command.ignoredTombstonedPartitions()) - return rows; - else - return rows.size() > command.limit() ? rows.subList(0, command.limit()) : rows; + return command.postReconciliationProcessing(rows); } public Map<String, List<String>> getSchemaVersions()