Author: jbellis Date: Mon Aug 2 15:06:01 2010 New Revision: 981544 URL: http://svn.apache.org/viewvc?rev=981544&view=rev Log: split out SimpleColumnReader and ColumnGroupReader from SSTableSliceIterator as SimpleSliceReader and IndexedSliceReader patch by jbellis; reviewed by gdusbabek for CASSANDRA-1338
Added: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Added: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java?rev=981544&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java Mon Aug 2 15:06:01 2010 @@ -0,0 +1,167 @@ +package org.apache.cassandra.db.columniterator; + +import java.io.IOError; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.List; + +import com.google.common.collect.AbstractIterator; + +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.sstable.IndexHelper; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileMark; + +/** + * This is a reader that finds the block for a starting column and returns + * blocks before/after it for each next call. This function assumes that + * the CF is sorted by name and exploits the name index. + */ +class IndexedSliceReader extends AbstractIterator<IColumn> implements IColumnIterator +{ + private final ColumnFamily emptyColumnFamily; + + private final List<IndexHelper.IndexInfo> indexes; + private final FileDataInput file; + private final byte[] startColumn; + private final byte[] finishColumn; + private final boolean reversed; + + private int curRangeIndex; + private Deque<IColumn> blockColumns = new ArrayDeque<IColumn>(); + private final FileMark mark; + private AbstractType comparator; + + public IndexedSliceReader(SSTableReader sstable, FileDataInput input, byte[] startColumn, byte[] finishColumn, boolean reversed) + { + this.file = input; + this.startColumn = startColumn; + this.finishColumn = finishColumn; + this.reversed = reversed; + comparator = sstable.getColumnComparator(); + try + { + IndexHelper.skipBloomFilter(file); + indexes = IndexHelper.deserializeIndex(file); + + emptyColumnFamily = ColumnFamily.serializer().deserializeFromSSTableNoColumns(sstable.makeColumnFamily(), file); + file.readInt(); // column count + } + catch (IOException e) + { + throw new IOError(e); + } + this.mark = file.mark(); + curRangeIndex = IndexHelper.indexFor(startColumn, indexes, comparator, reversed); + if (reversed && curRangeIndex == indexes.size()) + curRangeIndex--; + } + + public ColumnFamily getColumnFamily() + { + return emptyColumnFamily; + } + + public DecoratedKey getKey() + { + throw new UnsupportedOperationException(); + } + + private boolean isColumnNeeded(IColumn column) + { + if (startColumn.length == 0 && finishColumn.length == 0) + return true; + else if (startColumn.length == 0 && !reversed) + return comparator.compare(column.name(), finishColumn) <= 0; + else if (startColumn.length == 0 && reversed) + return comparator.compare(column.name(), finishColumn) >= 0; + else if (finishColumn.length == 0 && !reversed) + return comparator.compare(column.name(), startColumn) >= 0; + else if (finishColumn.length == 0 && reversed) + return comparator.compare(column.name(), startColumn) <= 0; + else if (!reversed) + return comparator.compare(column.name(), startColumn) >= 0 && comparator.compare(column.name(), finishColumn) <= 0; + else // if reversed + return comparator.compare(column.name(), startColumn) <= 0 && comparator.compare(column.name(), finishColumn) >= 0; + } + + protected IColumn computeNext() + { + while (true) + { + IColumn column = blockColumns.poll(); + if (column != null && isColumnNeeded(column)) + return column; + try + { + if (column == null && !getNextBlock()) + return endOfData(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } + + public boolean getNextBlock() throws IOException + { + if (curRangeIndex < 0 || curRangeIndex >= indexes.size()) + return false; + + /* seek to the correct offset to the data, and calculate the data size */ + IndexHelper.IndexInfo curColPosition = indexes.get(curRangeIndex); + + /* see if this read is really necessary. */ + if (reversed) + { + if ((finishColumn.length > 0 && comparator.compare(finishColumn, curColPosition.lastName) > 0) || + (startColumn.length > 0 && comparator.compare(startColumn, curColPosition.firstName) < 0)) + return false; + } + else + { + if ((startColumn.length > 0 && comparator.compare(startColumn, curColPosition.lastName) > 0) || + (finishColumn.length > 0 && comparator.compare(finishColumn, curColPosition.firstName) < 0)) + return false; + } + + boolean outOfBounds = false; + + file.reset(mark); + long curOffset = file.skipBytes((int) curColPosition.offset); + assert curOffset == curColPosition.offset; + while (file.bytesPastMark(mark) < curColPosition.offset + curColPosition.width && !outOfBounds) + { + IColumn column = emptyColumnFamily.getColumnSerializer().deserialize(file); + if (reversed) + blockColumns.addFirst(column); + else + blockColumns.addLast(column); + + /* see if we can stop seeking. */ + if (!reversed && finishColumn.length > 0) + outOfBounds = comparator.compare(column.name(), finishColumn) >= 0; + else if (reversed && startColumn.length > 0) + outOfBounds = comparator.compare(column.name(), startColumn) >= 0; + + if (outOfBounds) + break; + } + + if (reversed) + curRangeIndex--; + else + curRangeIndex++; + return true; + } + + public void close() throws IOException + { + } +} Modified: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java?rev=981544&r1=981543&r2=981544&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java Mon Aug 2 15:06:01 2010 @@ -21,7 +21,6 @@ package org.apache.cassandra.db.columnit */ -import java.util.*; import java.io.IOError; import java.io.IOException; @@ -29,13 +28,9 @@ import org.apache.cassandra.config.Datab import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.io.sstable.IndexHelper; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.io.util.FileMark; -import com.google.common.collect.AbstractIterator; import org.apache.cassandra.utils.FBUtilities; /** @@ -43,10 +38,7 @@ import org.apache.cassandra.utils.FBUtil */ public class SSTableSliceIterator implements IColumnIterator { - private final boolean reversed; - private final byte[] startColumn; - private final byte[] finishColumn; - private final AbstractType comparator; + private final FileDataInput file; private IColumnIterator reader; private boolean closeFileWhenDone = false; private DecoratedKey decoratedKey; @@ -70,10 +62,7 @@ public class SSTableSliceIterator implem */ public SSTableSliceIterator(SSTableReader ssTable, FileDataInput file, DecoratedKey key, byte[] startColumn, byte[] finishColumn, boolean reversed) { - this.reversed = reversed; - this.comparator = ssTable.getColumnComparator(); - this.startColumn = startColumn; - this.finishColumn = finishColumn; + this.file = file; this.decoratedKey = key; if (file == null) @@ -97,7 +86,9 @@ public class SSTableSliceIterator implem } } - reader = startColumn.length == 0 && !reversed ? new SimpleColumnReader(ssTable, file) : new ColumnGroupReader(ssTable, file); + reader = startColumn.length == 0 && !reversed + ? new SimpleSliceReader(ssTable, file, finishColumn) + : new IndexedSliceReader(ssTable, file, startColumn, finishColumn, reversed); } public DecoratedKey getKey() @@ -127,214 +118,8 @@ public class SSTableSliceIterator implem public void close() throws IOException { - if (reader != null) - reader.close(); - } - - private class SimpleColumnReader extends AbstractIterator<IColumn> implements IColumnIterator - { - private final FileDataInput file; - private final ColumnFamily emptyColumnFamily; - private final int columns; - private int i; - private FileMark mark; - - public SimpleColumnReader(SSTableReader ssTable, FileDataInput input) - { - this.file = input; - try - { - IndexHelper.skipBloomFilter(file); - IndexHelper.skipIndex(file); - - emptyColumnFamily = ColumnFamily.serializer().deserializeFromSSTableNoColumns(ssTable.makeColumnFamily(), file); - columns = file.readInt(); - mark = file.mark(); - } - catch (IOException e) - { - throw new IOError(e); - } - } - - protected IColumn computeNext() - { - if (i++ >= columns) - return endOfData(); - - IColumn column; - try - { - file.reset(mark); - column = emptyColumnFamily.getColumnSerializer().deserialize(file); - } - catch (IOException e) - { - throw new RuntimeException("error reading " + i + " of " + columns, e); - } - if (finishColumn.length > 0 && comparator.compare(column.name(), finishColumn) > 0) - return endOfData(); - - mark = file.mark(); - return column; - } - - public ColumnFamily getColumnFamily() throws IOException - { - return emptyColumnFamily; - } - - public void close() throws IOException - { + if (closeFileWhenDone) file.close(); - } - - public DecoratedKey getKey() - { - throw new UnsupportedOperationException(); - } } - /** - * This is a reader that finds the block for a starting column and returns - * blocks before/after it for each next call. This function assumes that - * the CF is sorted by name and exploits the name index. - */ - class ColumnGroupReader extends AbstractIterator<IColumn> implements IColumnIterator - { - private final ColumnFamily emptyColumnFamily; - - private final List<IndexHelper.IndexInfo> indexes; - private final FileDataInput file; - - private int curRangeIndex; - private Deque<IColumn> blockColumns = new ArrayDeque<IColumn>(); - private final FileMark mark; - - public ColumnGroupReader(SSTableReader ssTable, FileDataInput input) - { - this.file = input; - try - { - IndexHelper.skipBloomFilter(file); - indexes = IndexHelper.deserializeIndex(file); - - emptyColumnFamily = ColumnFamily.serializer().deserializeFromSSTableNoColumns(ssTable.makeColumnFamily(), file); - file.readInt(); // column count - } - catch (IOException e) - { - throw new IOError(e); - } - this.mark = file.mark(); - curRangeIndex = IndexHelper.indexFor(startColumn, indexes, comparator, reversed); - if (reversed && curRangeIndex == indexes.size()) - curRangeIndex--; - } - - public ColumnFamily getColumnFamily() - { - return emptyColumnFamily; - } - - public DecoratedKey getKey() - { - throw new UnsupportedOperationException(); - } - - private boolean isColumnNeeded(IColumn column) - { - if (startColumn.length == 0 && finishColumn.length == 0) - return true; - else if (startColumn.length == 0 && !reversed) - return comparator.compare(column.name(), finishColumn) <= 0; - else if (startColumn.length == 0 && reversed) - return comparator.compare(column.name(), finishColumn) >= 0; - else if (finishColumn.length == 0 && !reversed) - return comparator.compare(column.name(), startColumn) >= 0; - else if (finishColumn.length == 0 && reversed) - return comparator.compare(column.name(), startColumn) <= 0; - else if (!reversed) - return comparator.compare(column.name(), startColumn) >= 0 && comparator.compare(column.name(), finishColumn) <= 0; - else // if reversed - return comparator.compare(column.name(), startColumn) <= 0 && comparator.compare(column.name(), finishColumn) >= 0; - } - - protected IColumn computeNext() - { - while (true) - { - IColumn column = blockColumns.poll(); - if (column != null && isColumnNeeded(column)) - return column; - try - { - if (column == null && !getNextBlock()) - return endOfData(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - } - - public boolean getNextBlock() throws IOException - { - if (curRangeIndex < 0 || curRangeIndex >= indexes.size()) - return false; - - /* seek to the correct offset to the data, and calculate the data size */ - IndexHelper.IndexInfo curColPosition = indexes.get(curRangeIndex); - - /* see if this read is really necessary. */ - if (reversed) - { - if ((finishColumn.length > 0 && comparator.compare(finishColumn, curColPosition.lastName) > 0) || - (startColumn.length > 0 && comparator.compare(startColumn, curColPosition.firstName) < 0)) - return false; - } - else - { - if ((startColumn.length > 0 && comparator.compare(startColumn, curColPosition.lastName) > 0) || - (finishColumn.length > 0 && comparator.compare(finishColumn, curColPosition.firstName) < 0)) - return false; - } - - boolean outOfBounds = false; - - file.reset(mark); - long curOffset = file.skipBytes((int) curColPosition.offset); - assert curOffset == curColPosition.offset; - while (file.bytesPastMark(mark) < curColPosition.offset + curColPosition.width && !outOfBounds) - { - IColumn column = emptyColumnFamily.getColumnSerializer().deserialize(file); - if (reversed) - blockColumns.addFirst(column); - else - blockColumns.addLast(column); - - /* see if we can stop seeking. */ - if (!reversed && finishColumn.length > 0) - outOfBounds = comparator.compare(column.name(), finishColumn) >= 0; - else if (reversed && startColumn.length > 0) - outOfBounds = comparator.compare(column.name(), startColumn) >= 0; - - if (outOfBounds) - break; - } - - if (reversed) - curRangeIndex--; - else - curRangeIndex++; - return true; - } - - public void close() throws IOException - { - if(closeFileWhenDone) - file.close(); - } - } } Added: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java?rev=981544&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java Mon Aug 2 15:06:01 2010 @@ -0,0 +1,82 @@ +package org.apache.cassandra.db.columniterator; + +import java.io.IOError; +import java.io.IOException; + +import com.google.common.collect.AbstractIterator; + +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.sstable.IndexHelper; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileMark; + +class SimpleSliceReader extends AbstractIterator<IColumn> implements IColumnIterator +{ + private final FileDataInput file; + private final byte[] finishColumn; + private final AbstractType comparator; + private final ColumnFamily emptyColumnFamily; + private final int columns; + private int i; + private FileMark mark; + + public SimpleSliceReader(SSTableReader sstable, FileDataInput input, byte[] finishColumn) + { + this.file = input; + this.finishColumn = finishColumn; + comparator = sstable.getColumnComparator(); + try + { + IndexHelper.skipBloomFilter(file); + IndexHelper.skipIndex(file); + + emptyColumnFamily = ColumnFamily.serializer().deserializeFromSSTableNoColumns(sstable.makeColumnFamily(), file); + columns = file.readInt(); + mark = file.mark(); + } + catch (IOException e) + { + throw new IOError(e); + } + } + + protected IColumn computeNext() + { + if (i++ >= columns) + return endOfData(); + + IColumn column; + try + { + file.reset(mark); + column = emptyColumnFamily.getColumnSerializer().deserialize(file); + } + catch (IOException e) + { + throw new RuntimeException("error reading " + i + " of " + columns, e); + } + if (finishColumn.length > 0 && comparator.compare(column.name(), finishColumn) > 0) + return endOfData(); + + mark = file.mark(); + return column; + } + + public ColumnFamily getColumnFamily() throws IOException + { + return emptyColumnFamily; + } + + public void close() throws IOException + { + } + + public DecoratedKey getKey() + { + throw new UnsupportedOperationException(); + } +} Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=981544&r1=981543&r2=981544&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Mon Aug 2 15:06:01 2010 @@ -57,9 +57,9 @@ public abstract class SSTable public static final String COMPONENT_COMPACTED = "Compacted"; - protected Descriptor desc; + protected final Descriptor desc; protected final CFMetaData metadata; - protected IPartitioner partitioner; + protected final IPartitioner partitioner; public static final String TEMPFILE_MARKER = "tmp";