Updated Branches: refs/heads/cassandra-1.1 c6f12ef74 -> 3db0a75d5
fix KEYS index from skipping results patch by Dmitry Petrashko; reviewed by Pavel Yaskevich for CASSANDRA-3996 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3db0a75d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3db0a75d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3db0a75d Branch: refs/heads/cassandra-1.1 Commit: 3db0a75d5bc971b219d08d0d514c97d6ac9c7aaf Parents: c6f12ef Author: Pavel Yaskevich <xe...@apache.org> Authored: Tue Mar 13 14:10:13 2012 +0300 Committer: Pavel Yaskevich <xe...@apache.org> Committed: Tue Mar 13 14:12:28 2012 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/index/MultiRowIndexSearcherIterator.java | 225 +++++++++++++++ .../cassandra/db/index/keys/KeysSearcher.java | 136 ++-------- 3 files changed, 247 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3db0a75d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5ed5299..3372a05 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,7 @@ 1.1.1-dev * optimize commitlog checksumming (CASSANDRA-3610) * identify and blacklist corrupted SSTables from future compactions (CASSANDRA-2261) + * fix KEYS index from skipping results (CASSANDRA-3996) 1.1-dev http://git-wip-us.apache.org/repos/asf/cassandra/blob/3db0a75d/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java b/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java new file mode 100644 index 0000000..ba7a023 --- /dev/null +++ b/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.index; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ExtendedFilter; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.thrift.IndexExpression; +import org.apache.cassandra.utils.ByteBufferUtil; + + +/** + * This class is a general searcher that visits rows returned by nextIndexKey(); + */ +public abstract class MultiRowIndexSearcherIterator extends ColumnFamilyStore.AbstractScanIterator +{ + private static final Logger logger = LoggerFactory.getLogger(MultiRowIndexSearcherIterator.class); + private static final Iterator<IColumn> EMPTY_ITERATOR = Collections.<IColumn>emptyList().iterator(); + + /* keys within a row */ + protected final AbstractBounds<RowPosition> range; + private ByteBuffer lastSeenKey; + private final ByteBuffer startKey; + private final ByteBuffer endKey; + + private Iterator<IColumn> currentIndexKeyData = null; + private final QueryPath path; + + private final IndexExpression expression; + private final ExtendedFilter filter; + protected final ColumnFamilyStore indexCfs; + private final ColumnFamilyStore baseCfs; + private final boolean rightRangeIsNotMinimum; + protected DecoratedKey curIndexKey; + + private final int rowsPerQuery; + private int columnsRead; + + + public MultiRowIndexSearcherIterator(IndexExpression expression, + ColumnFamilyStore baseCfs, + ColumnFamilyStore indexCfs, + ExtendedFilter filter, + AbstractBounds<RowPosition> range) + { + this.expression = expression; + this.baseCfs = baseCfs; + this.range = range; + this.filter = filter; + this.indexCfs = indexCfs; + + /* + * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of + * the indexed row unfortunately (which will be inefficient), because we have not way to intuit the small + * possible key having a given token. A fix would be to actually store the token along the key in the + * indexed row. + */ + startKey = range.left instanceof DecoratedKey ? ((DecoratedKey) range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER; + endKey = range.right instanceof DecoratedKey ? ((DecoratedKey) range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER; + + int meanColumns = Math.max(indexCfs.getMeanColumns(), 1); + + // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses + rowsPerQuery = Math.max(Math.min(filter.maxRows(), filter.maxColumns() / meanColumns), 2); + rightRangeIsNotMinimum = !range.right.isMinimum(baseCfs.partitioner); + path = new QueryPath(baseCfs.columnFamily); + + } + + /** + * This function should return indexCfs keys in order they would be scanned by searcher + * @return next key for scanning of null if endOfData + */ + protected abstract DecoratedKey nextIndexKey(); + + /** + * resets internal state preparing for next indexCfs row scan. + */ + protected void resetState() + { + curIndexKey = nextIndexKey(); + currentIndexKeyData = EMPTY_ITERATOR; + lastSeenKey = startKey; + columnsRead = Integer.MAX_VALUE; + } + + protected Row computeNext() + { + if (currentIndexKeyData == null) // this is first call. Initialize + resetState(); + + Row result = null; + while (result == null && curIndexKey != null) // curIndexKey would be null when endOfData is reached + { + if (!currentIndexKeyData.hasNext()) // we've finished scanning row page + { + if (columnsRead < rowsPerQuery) // previously we've read less then we queried. No more pages to read within this row + { + logger.debug("Read only {} (< {}) last page through, must be done", columnsRead, rowsPerQuery); + resetState(); + } + else + { + if (logger.isDebugEnabled()) + logger.debug(String.format("Scanning index %s starting with %s", + expressionString(expression), baseCfs.metadata.getKeyValidator().getString(startKey))); + + QueryFilter indexFilter = QueryFilter.getSliceFilter(curIndexKey, + new QueryPath(indexCfs.getColumnFamilyName()), + lastSeenKey, + endKey, + false, + rowsPerQuery); + + ColumnFamily indexRow = indexCfs.getColumnFamily(indexFilter); //get next row page + + if (indexRow != null) + { + Collection<IColumn> sortedColumns = indexRow.getSortedColumns(); + columnsRead = sortedColumns.size(); + currentIndexKeyData = sortedColumns.iterator(); + IColumn firstColumn = sortedColumns.iterator().next(); + + // Paging is racy, so it is possible the first column_name of a page is not the last seen one. + if (lastSeenKey != startKey && lastSeenKey.equals(firstColumn.name())) + { + // skip the row we already saw w/ the last page of results + currentIndexKeyData.next(); + logger.debug("Skipping {}", baseCfs.metadata.getKeyValidator().getString(firstColumn.name())); + } + else if (range instanceof Range && currentIndexKeyData.hasNext() && firstColumn.name().equals(startKey)) + { + // skip key excluded by range + currentIndexKeyData.next(); + logger.debug("Skipping first key as range excludes it {}", baseCfs.metadata.getKeyValidator().getString(firstColumn.name())); + } + } + else // page is empty, nothing to scan within this row + { + columnsRead = 0; + currentIndexKeyData = EMPTY_ITERATOR; + } + } + } + + + while (result == null && currentIndexKeyData.hasNext()) // rolling through columns in page + { + IColumn column = currentIndexKeyData.next(); + lastSeenKey = column.name(); + + if (column.isMarkedForDelete()) + { + logger.debug("Skipping {}", column); + continue; + } + + DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey); + + if (rightRangeIsNotMinimum && range.right.compareTo(dk) < 0) // rightRangeIsNotMinimum is required to serve ring cycles + { + logger.debug("Reached end of assigned scan range"); + resetState(); + } + else if (range.contains(dk)) + { + logger.debug("Returning index hit for {}", dk); + ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, path, filter.initialFilter())); + + // While the column family we'll get in the end should contains the primary clause column_name, + // the initialFilter may not have found it and can thus be null + if (data == null) + data = ColumnFamily.create(baseCfs.metadata); + + result = new Row(dk, data); + } + else + { + logger.debug("Skipping entry {} outside of assigned scan range", dk.token); + } + } + } + + return result == null ? endOfData() : result; + } + + private String expressionString(IndexExpression expr) + { + return String.format("'%s.%s %s %s'", + baseCfs.columnFamily, + baseCfs.getComparator().getString(expr.column_name), + expr.op, + baseCfs.metadata.getColumn_metadata().get(expr.column_name).getValidator().getString(expr.value)); + } + + public void close() throws IOException + {} +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3db0a75d/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java index bd4be7e..0914c64 100644 --- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java +++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java @@ -17,20 +17,18 @@ */ package org.apache.cassandra.db.index.keys; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.index.MultiRowIndexSearcherIterator; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.index.SecondaryIndexSearcher; import org.apache.cassandra.dht.AbstractBounds; -import org.apache.cassandra.dht.Range; import org.apache.cassandra.thrift.IndexExpression; import org.apache.cassandra.thrift.IndexOperator; -import org.apache.cassandra.utils.ByteBufferUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,15 +64,6 @@ public class KeysSearcher extends SecondaryIndexSearcher return best; } - private String expressionString(IndexExpression expr) - { - return String.format("'%s.%s %s %s'", - baseCfs.columnFamily, - baseCfs.getComparator().getString(expr.column_name), - expr.op, - baseCfs.metadata.getColumn_metadata().get(expr.column_name).getValidator().getString(expr.value)); - } - public boolean isIndexing(List<IndexExpression> clause) { return highestSelectivityPredicate(clause) != null; @@ -95,115 +84,32 @@ public class KeysSearcher extends SecondaryIndexSearcher // TODO: allow merge join instead of just one index + loop final IndexExpression primary = highestSelectivityPredicate(filter.getClause()); final SecondaryIndex index = indexManager.getIndexForColumn(primary.column_name); + if (logger.isDebugEnabled()) logger.debug("Primary scan clause is " + baseCfs.getComparator().getString(primary.column_name)); + assert index != null; - final DecoratedKey indexKey = indexManager.getIndexKeyFor(primary.column_name, primary.value); + return new KeysMultiRowIndexSearcherIterator(primary, filter, range, indexManager, index); + } - /* - * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of - * the indexed row unfortunately (which will be inefficient), because we have not way to intuit the small - * possible key having a given token. A fix would be to actually store the token along the key in the - * indexed row. - */ - final ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER; - final ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER; + public class KeysMultiRowIndexSearcherIterator extends MultiRowIndexSearcherIterator + { + final DecoratedKey indexKey; - return new ColumnFamilyStore.AbstractScanIterator() + public KeysMultiRowIndexSearcherIterator(IndexExpression expression, + ExtendedFilter filter, + AbstractBounds<RowPosition> range, + SecondaryIndexManager indexManager, + SecondaryIndex index) { - private ByteBuffer lastSeenKey = startKey; - private Iterator<IColumn> indexColumns; - private final QueryPath path = new QueryPath(baseCfs.columnFamily); - private int columnsRead = Integer.MAX_VALUE; - - protected Row computeNext() - { - int meanColumns = Math.max(index.getIndexCfs().getMeanColumns(), 1); - // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses - int rowsPerQuery = Math.max(Math.min(filter.maxRows(), filter.maxColumns() / meanColumns), 2); - while (true) - { - if (indexColumns == null || !indexColumns.hasNext()) - { - if (columnsRead < rowsPerQuery) - { - logger.debug("Read only {} (< {}) last page through, must be done", columnsRead, rowsPerQuery); - return endOfData(); - } - - if (logger.isDebugEnabled()) - logger.debug(String.format("Scanning index %s starting with %s", - expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey))); - - QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey, - new QueryPath(index.getIndexCfs().getColumnFamilyName()), - lastSeenKey, - endKey, - false, - rowsPerQuery); - ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter); - logger.debug("fetched {}", indexRow); - if (indexRow == null) - { - logger.debug("no data, all done"); - return endOfData(); - } - - Collection<IColumn> sortedColumns = indexRow.getSortedColumns(); - columnsRead = sortedColumns.size(); - indexColumns = sortedColumns.iterator(); - IColumn firstColumn = sortedColumns.iterator().next(); - - // Paging is racy, so it is possible the first column of a page is not the last seen one. - if (lastSeenKey != startKey && lastSeenKey.equals(firstColumn.name())) - { - // skip the row we already saw w/ the last page of results - indexColumns.next(); - columnsRead--; - logger.debug("Skipping {}", baseCfs.metadata.getKeyValidator().getString(firstColumn.name())); - } - else if (range instanceof Range && indexColumns.hasNext() && firstColumn.name().equals(startKey)) - { - // skip key excluded by range - indexColumns.next(); - columnsRead--; - logger.debug("Skipping first key as range excludes it"); - } - } - - while (indexColumns.hasNext()) - { - IColumn column = indexColumns.next(); - lastSeenKey = column.name(); - if (column.isMarkedForDelete()) - { - logger.debug("skipping {}", column.name()); - continue; - } - - DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey); - if (!range.right.isMinimum(baseCfs.partitioner) && range.right.compareTo(dk) < 0) - { - logger.debug("Reached end of assigned scan range"); - return endOfData(); - } - if (!range.contains(dk)) - { - logger.debug("Skipping entry {} outside of assigned scan range", dk.token); - continue; - } - - logger.debug("Returning index hit for {}", dk); - ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, path, filter.initialFilter())); - // While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null - if (data == null) - data = ColumnFamily.create(baseCfs.metadata); - return new Row(dk, data); - } - } - } + super(expression, baseCfs, index.getIndexCfs(), filter, range); + indexKey = indexManager.getIndexKeyFor(expression.column_name, expression.value); + } - public void close() throws IOException {} - }; + @Override + protected final DecoratedKey nextIndexKey() + { + return curIndexKey == null ? indexKey : null; // keys index always scan single row in indexCfs + } } }