Repository: cassandra
Updated Branches:
  refs/heads/trunk 95638b6e2 -> afb52aa9a


Add support for top-k custom 2i queries

patch by Andrés de la Peña; reviewed by Sam Tunnicliffe for
CASSANDRA-8717


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4c7c5be7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4c7c5be7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4c7c5be7

Branch: refs/heads/trunk
Commit: 4c7c5be798e2a7d1e72d086bc5011242ea0173dc
Parents: 5bae5a3
Author: Andrés de la Peña <adelap...@stratio.com>
Authored: Tue May 5 20:45:09 2015 +0300
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Tue May 5 20:46:12 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/AbstractRangeCommand.java      | 23 ++++++++
 .../db/index/SecondaryIndexManager.java         | 48 ++++++++++------
 .../db/index/SecondaryIndexSearcher.java        | 44 ++++++++++++---
 .../db/index/composites/CompositesSearcher.java |  2 +-
 .../cassandra/db/index/keys/KeysSearcher.java   |  2 +-
 .../apache/cassandra/service/StorageProxy.java  | 58 ++++++++++++--------
 7 files changed, 127 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64d0760..da14ca3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.6
+ * Add support for top-k custom 2i queries (CASSANDRA-8717)
  * Fix error when dropping table during compaction (CASSANDRA-9251)
  * cassandra-stress supports validation operations over user profiles 
(CASSANDRA-8773)
  * Add support for rate limiting log messages (CASSANDRA-9029)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractRangeCommand.java 
b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
index b358f1b..959b524 100644
--- a/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
+++ b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.index.*;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.service.IReadCommand;
@@ -35,6 +36,8 @@ public abstract class AbstractRangeCommand implements 
IReadCommand
     public final IDiskAtomFilter predicate;
     public final List<IndexExpression> rowFilter;
 
+    public final SecondaryIndexSearcher searcher;
+
     public AbstractRangeCommand(String keyspace, String columnFamily, long 
timestamp, AbstractBounds<RowPosition> keyRange, IDiskAtomFilter predicate, 
List<IndexExpression> rowFilter)
     {
         this.keyspace = keyspace;
@@ -43,6 +46,26 @@ public abstract class AbstractRangeCommand implements 
IReadCommand
         this.keyRange = keyRange;
         this.predicate = predicate;
         this.rowFilter = rowFilter;
+        SecondaryIndexManager indexManager = 
Keyspace.open(keyspace).getColumnFamilyStore(columnFamily).indexManager;
+        this.searcher = 
indexManager.getHighestSelectivityIndexSearcher(rowFilter);
+    }
+
+    public boolean requiresScanningAllRanges()
+    {
+        return searcher != null && 
searcher.requiresScanningAllRanges(rowFilter);
+    }
+
+    public List<Row> postReconciliationProcessing(List<Row> rows)
+    {
+        return searcher == null ? trim(rows) : 
trim(searcher.postReconciliationProcessing(rowFilter, rows));
+    }
+
+    private List<Row> trim(List<Row> rows)
+    {
+        if (countCQL3Rows())
+            return rows;
+        else
+            return rows.size() > limit() ? rows.subList(0, limit()) : rows;
     }
 
     public String getKeyspace()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index e4a9ff8..ab6df1e 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -638,25 +638,11 @@ public class SecondaryIndexManager
      */
     public List<Row> search(ExtendedFilter filter)
     {
-        List<SecondaryIndexSearcher> indexSearchers = 
getIndexSearchersForQuery(filter.getClause());
-
-        if (indexSearchers.isEmpty())
+        SecondaryIndexSearcher mostSelective = 
getHighestSelectivityIndexSearcher(filter.getClause());
+        if (mostSelective == null)
             return Collections.emptyList();
-
-        SecondaryIndexSearcher mostSelective = null;
-        long bestEstimate = Long.MAX_VALUE;
-        for (SecondaryIndexSearcher searcher : indexSearchers)
-        {
-            SecondaryIndex highestSelectivityIndex = 
searcher.highestSelectivityIndex(filter.getClause());
-            long estimate = highestSelectivityIndex.estimateResultRows();
-            if (estimate <= bestEstimate)
-            {
-                bestEstimate = estimate;
-                mostSelective = searcher;
-            }
-        }
-
-        return mostSelective.search(filter);
+        else
+            return mostSelective.search(filter);
     }
 
     public Set<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
@@ -849,4 +835,30 @@ public class SecondaryIndexManager
         }
 
     }
+
+    public SecondaryIndexSearcher 
getHighestSelectivityIndexSearcher(List<IndexExpression> clause)
+    {
+        if (clause == null)
+            return null;
+
+        List<SecondaryIndexSearcher> indexSearchers = 
getIndexSearchersForQuery(clause);
+
+        if (indexSearchers.isEmpty())
+            return null;
+
+        SecondaryIndexSearcher mostSelective = null;
+        long bestEstimate = Long.MAX_VALUE;
+        for (SecondaryIndexSearcher searcher : indexSearchers)
+        {
+            SecondaryIndex highestSelectivityIndex = 
searcher.highestSelectivityIndex(clause);
+            long estimate = highestSelectivityIndex.estimateResultRows();
+            if (estimate <= bestEstimate)
+            {
+                bestEstimate = estimate;
+                mostSelective = searcher;
+            }
+        }
+
+        return mostSelective;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java 
b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index 93e0643..ab2cd75 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -43,7 +43,7 @@ public abstract class SecondaryIndexSearcher
 
     public SecondaryIndex highestSelectivityIndex(List<IndexExpression> clause)
     {
-        IndexExpression expr = highestSelectivityPredicate(clause);
+        IndexExpression expr = highestSelectivityPredicate(clause, false);
         return expr == null ? null : 
indexManager.getIndexForColumn(expr.column);
     }
 
@@ -77,7 +77,7 @@ public abstract class SecondaryIndexSearcher
     {
     }
 
-    protected IndexExpression 
highestSelectivityPredicate(List<IndexExpression> clause)
+    protected IndexExpression 
highestSelectivityPredicate(List<IndexExpression> clause, boolean 
includeInTrace)
     {
         IndexExpression best = null;
         int bestMeanCount = Integer.MAX_VALUE;
@@ -102,12 +102,40 @@ public abstract class SecondaryIndexSearcher
             }
         }
 
-        if (best == null)
-            Tracing.trace("No applicable indexes found");
-        else
-            Tracing.trace("Candidate index mean cardinalities are {}. Scanning 
with {}.",
-                          FBUtilities.toString(candidates), 
indexManager.getIndexForColumn(best.column).getIndexName());
-
+        if (includeInTrace)
+        {
+            if (best == null)
+                Tracing.trace("No applicable indexes found");
+            else if (Tracing.isTracing())
+                // pay for an additional threadlocal get() rather than build 
the strings unnecessarily
+                Tracing.trace("Candidate index mean cardinalities are {}. 
Scanning with {}.",
+                              FBUtilities.toString(candidates),
+                              
indexManager.getIndexForColumn(best.column).getIndexName());
+        }
         return best;
     }
+
+    /**
+     * Returns {@code true} if the specified list of {@link IndexExpression}s 
require a full scan of all the nodes.
+     *
+     * @param clause A list of {@link IndexExpression}s
+     * @return {@code true} if the {@code IndexExpression}s require a full 
scan, {@code false} otherwise
+     */
+    public boolean requiresScanningAllRanges(List<IndexExpression> clause)
+    {
+        return false;
+    }
+
+    /**
+     * Combines index query results from multiple nodes. This is done by the 
coordinator node after it has reconciled
+     * the replica responses.
+     *
+     * @param clause A list of {@link IndexExpression}s
+     * @param rows The index query results to be combined
+     * @return The combination of the index query results
+     */
+    public List<Row> postReconciliationProcessing(List<IndexExpression> 
clause, List<Row> rows)
+    {
+        return rows;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java 
b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index 3e523f4..a2d08e7 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -59,7 +59,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
     public List<Row> search(ExtendedFilter filter)
     {
         assert filter.getClause() != null && !filter.getClause().isEmpty();
-        final IndexExpression primary = 
highestSelectivityPredicate(filter.getClause());
+        final IndexExpression primary = 
highestSelectivityPredicate(filter.getClause(), true);
         final CompositesIndex index = 
(CompositesIndex)indexManager.getIndexForColumn(primary.column);
         // TODO: this should perhaps not open and maintain a writeOp for the 
full duration, but instead only *try* to delete stale entries, without blocking 
if there's no room
         // as it stands, we open a writeOp and keep it open for the duration 
to ensure that should this CF get flushed to make room we don't block the 
reclamation of any room being made

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java 
b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 4055b7c..634bb0c 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -53,7 +53,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
     public List<Row> search(ExtendedFilter filter)
     {
         assert filter.getClause() != null && !filter.getClause().isEmpty();
-        final IndexExpression primary = 
highestSelectivityPredicate(filter.getClause());
+        final IndexExpression primary = 
highestSelectivityPredicate(filter.getClause(), true);
         final SecondaryIndex index = 
indexManager.getIndexForColumn(primary.column);
         // TODO: this should perhaps not open and maintain a writeOp for the 
full duration, but instead only *try* to delete stale entries, without blocking 
if there's no room
         // as it stands, we open a writeOp and keep it open for the duration 
to ensure that should this CF get flushed to make room we don't block the 
reclamation of any room  being made

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index b41429e..1536e46 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1621,17 +1621,39 @@ public class StorageProxy implements StorageProxyMBean
             else
                 ranges = getRestrictedRanges(command.keyRange);
 
-            // our estimate of how many result rows there will be per-range
-            float resultRowsPerRange = estimateResultRowsPerRange(command, 
keyspace);
-            // underestimate how many rows we will get per-range in order to 
increase the likelihood that we'll
-            // fetch enough rows in the first round
-            resultRowsPerRange -= resultRowsPerRange * 
CONCURRENT_SUBREQUESTS_MARGIN;
-            int concurrencyFactor = resultRowsPerRange == 0.0
+            // determine the number of rows to be fetched and the concurrency 
factor
+            int rowsToBeFetched = command.limit();
+            int concurrencyFactor;
+            if (command.requiresScanningAllRanges())
+            {
+                // all nodes must be queried
+                rowsToBeFetched *= ranges.size();
+                concurrencyFactor = ranges.size();
+                logger.debug("Requested rows: {}, ranges.size(): {}; 
concurrent range requests: {}",
+                             command.limit(),
+                             ranges.size(),
+                             concurrencyFactor);
+                Tracing.trace("Submitting range requests on {} ranges with a 
concurrency of {}",
+                              new Object[]{ ranges.size(), concurrencyFactor});
+            }
+            else
+            {
+                // our estimate of how many result rows there will be per-range
+                float resultRowsPerRange = estimateResultRowsPerRange(command, 
keyspace);
+                // underestimate how many rows we will get per-range in order 
to increase the likelihood that we'll
+                // fetch enough rows in the first round
+                resultRowsPerRange -= resultRowsPerRange * 
CONCURRENT_SUBREQUESTS_MARGIN;
+                concurrencyFactor = resultRowsPerRange == 0.0
                                   ? 1
                                   : Math.max(1, Math.min(ranges.size(), (int) 
Math.ceil(command.limit() / resultRowsPerRange)));
-            logger.debug("Estimated result rows per range: {}; requested rows: 
{}, ranges.size(): {}; concurrent range requests: {}",
-                         resultRowsPerRange, command.limit(), ranges.size(), 
concurrencyFactor);
-            Tracing.trace("Submitting range requests on {} ranges with a 
concurrency of {} ({} rows per range expected)", new Object[]{ ranges.size(), 
concurrencyFactor, resultRowsPerRange});
+                logger.debug("Estimated result rows per range: {}; requested 
rows: {}, ranges.size(): {}; concurrent range requests: {}",
+                             resultRowsPerRange,
+                             command.limit(),
+                             ranges.size(),
+                             concurrencyFactor);
+                Tracing.trace("Submitting range requests on {} ranges with a 
concurrency of {} ({} rows per range expected)",
+                              new Object[]{ ranges.size(), concurrencyFactor, 
resultRowsPerRange});
+            }
 
             boolean haveSufficientRows = false;
             int i = 0;
@@ -1723,7 +1745,6 @@ public class StorageProxy implements StorageProxyMBean
                 List<AsyncOneResponse> repairResponses = new ArrayList<>();
                 for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, 
Iterable<Row>>> cmdPairHandler : scanHandlers)
                 {
-                    AbstractRangeCommand nodeCmd = cmdPairHandler.left;
                     ReadCallback<RangeSliceReply, Iterable<Row>> handler = 
cmdPairHandler.right;
                     RangeSliceResponseResolver resolver = 
(RangeSliceResponseResolver)handler.resolver;
 
@@ -1765,7 +1786,7 @@ public class StorageProxy implements StorageProxyMBean
 
                     // if we're done, great, otherwise, move to the next range
                     int count = countLiveRows ? liveRowCount : rows.size();
-                    if (count >= nodeCmd.limit())
+                    if (count >= rowsToBeFetched)
                     {
                         haveSufficientRows = true;
                         break;
@@ -1788,14 +1809,14 @@ public class StorageProxy implements StorageProxyMBean
                 }
 
                 if (haveSufficientRows)
-                    return trim(command, rows);
+                    return command.postReconciliationProcessing(rows);
 
                 // we didn't get enough rows in our concurrent fetch; 
recalculate our concurrency factor
                 // based on the results we've seen so far (as long as we still 
have ranges left to query)
                 if (i < ranges.size())
                 {
                     float fetchedRows = countLiveRows ? liveRowCount : 
rows.size();
-                    float remainingRows = command.limit() - fetchedRows;
+                    float remainingRows = rowsToBeFetched - fetchedRows;
                     float actualRowsPerRange;
                     if (fetchedRows == 0.0)
                     {
@@ -1819,16 +1840,7 @@ public class StorageProxy implements StorageProxyMBean
             rangeMetrics.addNano(latency);
             
Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency,
 TimeUnit.NANOSECONDS);
         }
-        return trim(command, rows);
-    }
-
-    private static List<Row> trim(AbstractRangeCommand command, List<Row> rows)
-    {
-        // for CQL3 queries, let the caller trim the results
-        if (command.countCQL3Rows() || command.ignoredTombstonedPartitions())
-            return rows;
-        else
-            return rows.size() > command.limit() ? rows.subList(0, 
command.limit()) : rows;
+        return command.postReconciliationProcessing(rows);
     }
 
     public Map<String, List<String>> getSchemaVersions()

Reply via email to