fix KEYS index from skipping results
patch by Dmitry Petrashko; reviewed by Pavel Yaskevich for CASSANDRA-3996


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3db0a75d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3db0a75d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3db0a75d

Branch: refs/heads/trunk
Commit: 3db0a75d5bc971b219d08d0d514c97d6ac9c7aaf
Parents: c6f12ef
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Tue Mar 13 14:10:13 2012 +0300
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Tue Mar 13 14:12:28 2012 +0300

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../db/index/MultiRowIndexSearcherIterator.java    |  225 +++++++++++++++
 .../cassandra/db/index/keys/KeysSearcher.java      |  136 ++--------
 3 files changed, 247 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3db0a75d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5ed5299..3372a05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
 1.1.1-dev
  * optimize commitlog checksumming (CASSANDRA-3610)
  * identify and blacklist corrupted SSTables from future compactions 
(CASSANDRA-2261)
+ * fix KEYS index from skipping results (CASSANDRA-3996)
 
 
 1.1-dev

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3db0a75d/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java 
b/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java
new file mode 100644
index 0000000..ba7a023
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ExtendedFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+
+/**
+ * This class is a general searcher that visits rows returned by 
nextIndexKey();
+ */
+public abstract class MultiRowIndexSearcherIterator extends 
ColumnFamilyStore.AbstractScanIterator
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(MultiRowIndexSearcherIterator.class);
+    private static final Iterator<IColumn> EMPTY_ITERATOR = 
Collections.<IColumn>emptyList().iterator();
+
+    /* keys within a row */
+    protected final AbstractBounds<RowPosition> range;
+    private ByteBuffer lastSeenKey;
+    private final ByteBuffer startKey;
+    private final ByteBuffer endKey;
+
+    private Iterator<IColumn> currentIndexKeyData = null;
+    private final QueryPath path;
+
+    private final IndexExpression expression;
+    private final ExtendedFilter filter;
+    protected final ColumnFamilyStore indexCfs;
+    private final ColumnFamilyStore baseCfs;
+    private final boolean rightRangeIsNotMinimum;
+    protected DecoratedKey curIndexKey;
+
+    private final int rowsPerQuery;
+    private int columnsRead;
+
+
+    public MultiRowIndexSearcherIterator(IndexExpression expression,
+                                         ColumnFamilyStore baseCfs,
+                                         ColumnFamilyStore indexCfs,
+                                         ExtendedFilter filter,
+                                         AbstractBounds<RowPosition> range)
+    {
+        this.expression = expression;
+        this.baseCfs = baseCfs;
+        this.range = range;
+        this.filter = filter;
+        this.indexCfs = indexCfs;
+
+        /*
+        * XXX: If the range requested is a token range, we'll have to start at 
the beginning (and stop at the end) of
+        * the indexed row unfortunately (which will be inefficient), because 
we have not way to intuit the small
+        * possible key having a given token. A fix would be to actually store 
the token along the key in the
+        * indexed row.
+        */
+        startKey = range.left instanceof DecoratedKey ? ((DecoratedKey) 
range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        endKey = range.right instanceof DecoratedKey ? ((DecoratedKey) 
range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
+        int meanColumns = Math.max(indexCfs.getMeanColumns(), 1);
+
+        // We shouldn't fetch only 1 row as this provides buggy paging in case 
the first row doesn't satisfy all clauses
+        rowsPerQuery = Math.max(Math.min(filter.maxRows(), filter.maxColumns() 
/ meanColumns), 2);
+        rightRangeIsNotMinimum = !range.right.isMinimum(baseCfs.partitioner);
+        path = new QueryPath(baseCfs.columnFamily);
+
+    }
+
+    /**
+     * This function should return indexCfs keys in order they would be 
scanned by searcher
+     * @return next key for scanning of null if endOfData
+     */
+    protected abstract DecoratedKey nextIndexKey();
+
+    /**
+     * resets internal state preparing for next indexCfs row scan.
+     */
+    protected void resetState()
+    {
+        curIndexKey = nextIndexKey();
+        currentIndexKeyData = EMPTY_ITERATOR;
+        lastSeenKey = startKey;
+        columnsRead = Integer.MAX_VALUE;
+    }
+
+    protected Row computeNext()
+    {
+        if (currentIndexKeyData == null) // this is first call. Initialize
+            resetState();
+
+        Row result = null;
+        while (result == null && curIndexKey != null) // curIndexKey would be 
null when endOfData is reached
+        {
+            if (!currentIndexKeyData.hasNext()) // we've finished scanning row 
page
+            {
+                if (columnsRead < rowsPerQuery) // previously we've read less 
then we queried. No more pages to read within this row
+                {
+                    logger.debug("Read only {} (< {}) last page through, must 
be done", columnsRead, rowsPerQuery);
+                    resetState();
+                }
+                else
+                {
+                    if (logger.isDebugEnabled())
+                        logger.debug(String.format("Scanning index %s starting 
with %s",
+                                                   
expressionString(expression), 
baseCfs.metadata.getKeyValidator().getString(startKey)));
+
+                    QueryFilter indexFilter = 
QueryFilter.getSliceFilter(curIndexKey,
+                                                                         new 
QueryPath(indexCfs.getColumnFamilyName()),
+                                                                         
lastSeenKey,
+                                                                         
endKey,
+                                                                         false,
+                                                                         
rowsPerQuery);
+
+                    ColumnFamily indexRow = 
indexCfs.getColumnFamily(indexFilter); //get next row page
+
+                    if (indexRow != null)
+                    {
+                        Collection<IColumn> sortedColumns = 
indexRow.getSortedColumns();
+                        columnsRead = sortedColumns.size();
+                        currentIndexKeyData = sortedColumns.iterator();
+                        IColumn firstColumn = sortedColumns.iterator().next();
+
+                        // Paging is racy, so it is possible the first 
column_name of a page is not the last seen one.
+                        if (lastSeenKey != startKey && 
lastSeenKey.equals(firstColumn.name()))
+                        {
+                            // skip the row we already saw w/ the last page of 
results
+                            currentIndexKeyData.next();
+                            logger.debug("Skipping {}", 
baseCfs.metadata.getKeyValidator().getString(firstColumn.name()));
+                        }
+                        else if (range instanceof Range && 
currentIndexKeyData.hasNext() && firstColumn.name().equals(startKey))
+                        {
+                            // skip key excluded by range
+                            currentIndexKeyData.next();
+                            logger.debug("Skipping first key as range excludes 
it {}", baseCfs.metadata.getKeyValidator().getString(firstColumn.name()));
+                        }
+                    }
+                    else // page is empty, nothing to scan within this row
+                    {
+                        columnsRead = 0;
+                        currentIndexKeyData = EMPTY_ITERATOR;
+                    }
+                }
+            }
+
+
+            while (result == null && currentIndexKeyData.hasNext()) // rolling 
through columns in page
+            {
+                IColumn column = currentIndexKeyData.next();
+                lastSeenKey = column.name();
+
+                if (column.isMarkedForDelete())
+                {
+                    logger.debug("Skipping {}", column);
+                    continue;
+                }
+
+                DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey);
+
+                if (rightRangeIsNotMinimum && range.right.compareTo(dk) < 0) 
// rightRangeIsNotMinimum is required to serve ring cycles
+                {
+                    logger.debug("Reached end of assigned scan range");
+                    resetState();
+                }
+                else if (range.contains(dk))
+                {
+                    logger.debug("Returning index hit for {}", dk);
+                    ColumnFamily data = baseCfs.getColumnFamily(new 
QueryFilter(dk, path, filter.initialFilter()));
+
+                    // While the column family we'll get in the end should 
contains the primary clause column_name,
+                    // the initialFilter may not have found it and can thus be 
null
+                    if (data == null)
+                        data = ColumnFamily.create(baseCfs.metadata);
+
+                    result = new Row(dk, data);
+                }
+                else
+                {
+                    logger.debug("Skipping entry {} outside of assigned scan 
range", dk.token);
+                }
+            }
+        }
+
+        return result == null ? endOfData() : result;
+    }
+
+    private String expressionString(IndexExpression expr)
+    {
+        return String.format("'%s.%s %s %s'",
+                             baseCfs.columnFamily,
+                             
baseCfs.getComparator().getString(expr.column_name),
+                             expr.op,
+                             
baseCfs.metadata.getColumn_metadata().get(expr.column_name).getValidator().getString(expr.value));
+    }
+
+    public void close() throws IOException
+    {}
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3db0a75d/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java 
b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index bd4be7e..0914c64 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -17,20 +17,18 @@
  */
 package org.apache.cassandra.db.index.keys;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.index.MultiRowIndexSearcherIterator;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,15 +64,6 @@ public class KeysSearcher extends SecondaryIndexSearcher
         return best;
     }
 
-    private String expressionString(IndexExpression expr)
-    {
-        return String.format("'%s.%s %s %s'",
-                             baseCfs.columnFamily,
-                             
baseCfs.getComparator().getString(expr.column_name),
-                             expr.op,
-                             
baseCfs.metadata.getColumn_metadata().get(expr.column_name).getValidator().getString(expr.value));
-    }
-
     public boolean isIndexing(List<IndexExpression> clause)
     {
         return highestSelectivityPredicate(clause) != null;
@@ -95,115 +84,32 @@ public class KeysSearcher extends SecondaryIndexSearcher
         // TODO: allow merge join instead of just one index + loop
         final IndexExpression primary = 
highestSelectivityPredicate(filter.getClause());
         final SecondaryIndex index = 
indexManager.getIndexForColumn(primary.column_name);
+
         if (logger.isDebugEnabled())
             logger.debug("Primary scan clause is " + 
baseCfs.getComparator().getString(primary.column_name));
+
         assert index != null;
-        final DecoratedKey indexKey = 
indexManager.getIndexKeyFor(primary.column_name, primary.value);
+        return new KeysMultiRowIndexSearcherIterator(primary, filter, range, 
indexManager, index);
+    }
 
-        /*
-         * XXX: If the range requested is a token range, we'll have to start 
at the beginning (and stop at the end) of
-         * the indexed row unfortunately (which will be inefficient), because 
we have not way to intuit the small
-         * possible key having a given token. A fix would be to actually store 
the token along the key in the
-         * indexed row.
-         */
-        final ByteBuffer startKey = range.left instanceof DecoratedKey ? 
((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-        final ByteBuffer endKey = range.right instanceof DecoratedKey ? 
((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+    public class KeysMultiRowIndexSearcherIterator extends 
MultiRowIndexSearcherIterator
+    {
+        final DecoratedKey indexKey;
 
-        return new ColumnFamilyStore.AbstractScanIterator()
+        public KeysMultiRowIndexSearcherIterator(IndexExpression expression,
+                                                 ExtendedFilter filter,
+                                                 AbstractBounds<RowPosition> 
range,
+                                                 SecondaryIndexManager 
indexManager,
+                                                 SecondaryIndex index)
         {
-            private ByteBuffer lastSeenKey = startKey;
-            private Iterator<IColumn> indexColumns;
-            private final QueryPath path = new QueryPath(baseCfs.columnFamily);
-            private int columnsRead = Integer.MAX_VALUE;
-
-            protected Row computeNext()
-            {
-                int meanColumns = 
Math.max(index.getIndexCfs().getMeanColumns(), 1);
-                // We shouldn't fetch only 1 row as this provides buggy paging 
in case the first row doesn't satisfy all clauses
-                int rowsPerQuery = Math.max(Math.min(filter.maxRows(), 
filter.maxColumns() / meanColumns), 2);
-                while (true)
-                {
-                    if (indexColumns == null || !indexColumns.hasNext())
-                    {
-                        if (columnsRead < rowsPerQuery)
-                        {
-                            logger.debug("Read only {} (< {}) last page 
through, must be done", columnsRead, rowsPerQuery);
-                            return endOfData();
-                        }
-
-                        if (logger.isDebugEnabled())
-                            logger.debug(String.format("Scanning index %s 
starting with %s",
-                                                       
expressionString(primary), 
index.getBaseCfs().metadata.getKeyValidator().getString(startKey)));
-
-                        QueryFilter indexFilter = 
QueryFilter.getSliceFilter(indexKey,
-                                                                             
new QueryPath(index.getIndexCfs().getColumnFamilyName()),
-                                                                             
lastSeenKey,
-                                                                             
endKey,
-                                                                             
false,
-                                                                             
rowsPerQuery);
-                        ColumnFamily indexRow = 
index.getIndexCfs().getColumnFamily(indexFilter);
-                        logger.debug("fetched {}", indexRow);
-                        if (indexRow == null)
-                        {
-                            logger.debug("no data, all done");
-                            return endOfData();
-                        }
-
-                        Collection<IColumn> sortedColumns = 
indexRow.getSortedColumns();
-                        columnsRead = sortedColumns.size();
-                        indexColumns = sortedColumns.iterator();
-                        IColumn firstColumn = sortedColumns.iterator().next();
-
-                        // Paging is racy, so it is possible the first column 
of a page is not the last seen one.
-                        if (lastSeenKey != startKey && 
lastSeenKey.equals(firstColumn.name()))
-                        {
-                            // skip the row we already saw w/ the last page of 
results
-                            indexColumns.next();
-                            columnsRead--;
-                            logger.debug("Skipping {}", 
baseCfs.metadata.getKeyValidator().getString(firstColumn.name()));
-                        }
-                        else if (range instanceof Range && 
indexColumns.hasNext() && firstColumn.name().equals(startKey))
-                        {
-                            // skip key excluded by range
-                            indexColumns.next();
-                            columnsRead--;
-                            logger.debug("Skipping first key as range excludes 
it");
-                        }
-                    }
-
-                    while (indexColumns.hasNext())
-                    {
-                        IColumn column = indexColumns.next();
-                        lastSeenKey = column.name();
-                        if (column.isMarkedForDelete())
-                        {
-                            logger.debug("skipping {}", column.name());
-                            continue;
-                        }
-
-                        DecoratedKey dk = 
baseCfs.partitioner.decorateKey(lastSeenKey);
-                        if (!range.right.isMinimum(baseCfs.partitioner) && 
range.right.compareTo(dk) < 0)
-                        {
-                            logger.debug("Reached end of assigned scan range");
-                            return endOfData();
-                        }
-                        if (!range.contains(dk))
-                        {
-                            logger.debug("Skipping entry {} outside of 
assigned scan range", dk.token);
-                            continue;
-                        }
-
-                        logger.debug("Returning index hit for {}", dk);
-                        ColumnFamily data = baseCfs.getColumnFamily(new 
QueryFilter(dk, path, filter.initialFilter()));
-                        // While the column family we'll get in the end should 
contains the primary clause column, the initialFilter may not have found it and 
can thus be null
-                        if (data == null)
-                            data = ColumnFamily.create(baseCfs.metadata);
-                        return new Row(dk, data);
-                    }
-                 }
-             }
+            super(expression, baseCfs, index.getIndexCfs(), filter, range);
+            indexKey = indexManager.getIndexKeyFor(expression.column_name, 
expression.value);
+        }
 
-            public void close() throws IOException {}
-        };
+        @Override
+        protected final DecoratedKey nextIndexKey()
+        {
+            return curIndexKey == null ? indexKey : null; // keys index always 
scan single row in indexCfs
+        }
     }
 }

Reply via email to