Author: jbellis Date: Tue Jun 15 13:29:07 2010 New Revision: 954875 URL: http://svn.apache.org/viewvc?rev=954875&view=rev Log: merge {SSTable,RowIndexed}Reader, {SSTable,RowIndexed}Scanner patch by jbellis; reviewed by gdusbabek for CASSANDRA-1127
Removed: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedScanner.java Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=954875&r1=954874&r2=954875&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Tue Jun 15 13:29:07 2010 @@ -26,6 +26,13 @@ import java.lang.ref.Reference; import java.nio.channels.FileChannel; import java.nio.MappedByteBuffer; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import org.apache.cassandra.io.util.BufferedRandomAccessFile; +import org.apache.cassandra.io.util.SegmentedFile; +import org.apache.cassandra.utils.BloomFilter; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,10 +51,13 @@ import org.apache.cassandra.io.util.File * SSTableReaders are open()ed by Table.onStart; after that they are created by SSTableWriter.renameAndOpen. * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead. */ -public abstract class SSTableReader extends SSTable implements Comparable<SSTableReader> +public class SSTableReader extends SSTable implements Comparable<SSTableReader> { private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class); + // guesstimated size of INDEX_INTERVAL index entries + private static final int INDEX_FILE_BUFFER_BYTES = 16 * IndexSummary.INDEX_INTERVAL; + // `finalizers` is required to keep the PhantomReferences alive after the enclosing SSTR is itself // unreferenced. otherwise they will never get enqueued. private static final Set<Reference<SSTableReader>> finalizers = new HashSet<Reference<SSTableReader>>(); @@ -97,6 +107,14 @@ public abstract class SSTableReader exte */ public final long maxDataAge; + // indexfile and datafile: might be null before a call to load() + private SegmentedFile ifile; + private SegmentedFile dfile; + + private InstrumentedCache<Pair<Descriptor,DecoratedKey>, Long> keyCache; + + private volatile SSTableDeletingReference phantomReference; + public static int indexInterval() { return IndexSummary.INDEX_INTERVAL; @@ -144,7 +162,7 @@ public abstract class SSTableReader exte // FIXME: version conditional readers here if (true) { - sstable = RowIndexedReader.internalOpen(descriptor, partitioner); + sstable = internalOpen(descriptor, partitioner); } if (logger.isDebugEnabled()) @@ -153,39 +171,173 @@ public abstract class SSTableReader exte return sstable; } + /** Open a RowIndexedReader which needs its state loaded from disk. */ + static SSTableReader internalOpen(Descriptor desc, IPartitioner partitioner) throws IOException + { + SSTableReader sstable = new SSTableReader(desc, partitioner, null, null, null, null, System.currentTimeMillis()); + + // versions before 'c' encoded keys as utf-16 before hashing to the filter + if (desc.versionCompareTo("c") < 0) + sstable.load(true); + else + { + sstable.load(false); + sstable.loadBloomFilter(); + } + + return sstable; + } + + /** + * Open a RowIndexedReader which already has its state initialized (by SSTableWriter). + */ + static SSTableReader internalOpen(Descriptor desc, IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary isummary, BloomFilter bf, long maxDataAge) throws IOException + { + assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null; + return new SSTableReader(desc, partitioner, ifile, dfile, isummary, bf, maxDataAge); + } + + SSTableReader(Descriptor desc, + IPartitioner partitioner, + SegmentedFile ifile, + SegmentedFile dfile, + IndexSummary indexSummary, + BloomFilter bloomFilter, + long maxDataAge) + throws IOException + { + super(desc, partitioner); + this.maxDataAge = maxDataAge; + + + this.ifile = ifile; + this.dfile = dfile; + this.indexSummary = indexSummary; + this.bf = bloomFilter; + } + public void setTrackedBy(SSTableTracker tracker) { phantomReference = new SSTableDeletingReference(tracker, this, finalizerQueue); finalizers.add(phantomReference); + keyCache = tracker.getKeyCache(); } - protected SSTableReader(Descriptor desc, IPartitioner partitioner, long maxDataAge) + void loadBloomFilter() throws IOException { - super(desc, partitioner); - this.maxDataAge = maxDataAge; + DataInputStream stream = new DataInputStream(new FileInputStream(filterFilename())); + try + { + bf = BloomFilter.serializer().deserialize(stream); + } + finally + { + stream.close(); + } } - private volatile SSTableDeletingReference phantomReference; + /** + * Loads ifile, dfile and indexSummary, and optionally recreates the bloom filter. + */ + private void load(boolean recreatebloom) throws IOException + { + SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(); + SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(); + + // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. + indexSummary = new IndexSummary(); + BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(), "r"); + try + { + long indexSize = input.length(); + if (recreatebloom) + // estimate key count based on index length + bf = BloomFilter.getFilter((int)(input.length() / 32), 15); + while (true) + { + long indexPosition = input.getFilePointer(); + if (indexPosition == indexSize) + break; + + DecoratedKey decoratedKey = partitioner.convertFromDiskFormat(FBUtilities.readShortByteArray(input)); + if (recreatebloom) + bf.add(decoratedKey.key); + long dataPosition = input.readLong(); + + indexSummary.maybeAddEntry(decoratedKey, indexPosition); + ibuilder.addPotentialBoundary(indexPosition); + dbuilder.addPotentialBoundary(dataPosition); + } + indexSummary.complete(); + } + finally + { + input.close(); + } + + // finalize the state of the reader + indexSummary.complete(); + ifile = ibuilder.complete(indexFilename()); + dfile = dbuilder.complete(getFilename()); + } + + /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */ + private IndexSummary.KeyPosition getIndexScanPosition(DecoratedKey decoratedKey) + { + assert indexSummary.getIndexPositions() != null && indexSummary.getIndexPositions().size() > 0; + int index = Collections.binarySearch(indexSummary.getIndexPositions(), new IndexSummary.KeyPosition(decoratedKey, -1)); + if (index < 0) + { + // binary search gives us the first index _greater_ than the key searched for, + // i.e., its insertion position + int greaterThan = (index + 1) * -1; + if (greaterThan == 0) + return null; + return indexSummary.getIndexPositions().get(greaterThan - 1); + } + else + { + return indexSummary.getIndexPositions().get(index); + } + } /** * For testing purposes only. */ - public abstract void forceFilterFailures(); + public void forceFilterFailures() + { + bf = BloomFilter.alwaysMatchingBloomFilter(); + } /** * @return The key cache: for monitoring purposes. */ - public abstract InstrumentedCache getKeyCache(); + public InstrumentedCache getKeyCache() + { + return keyCache; + } /** * @return An estimate of the number of keys in this SSTable. */ - public abstract long estimatedKeys(); + public long estimatedKeys() + { + return indexSummary.getIndexPositions().size() * IndexSummary.INDEX_INTERVAL; + } /** * @return Approximately 1/INDEX_INTERVALth of the keys in this SSTable. */ - public abstract Collection<DecoratedKey> getKeySamples(); + public Collection<DecoratedKey> getKeySamples() + { + return Collections2.transform(indexSummary.getIndexPositions(), + new Function<IndexSummary.KeyPosition, DecoratedKey>(){ + public DecoratedKey apply(IndexSummary.KeyPosition kp) + { + return kp.key; + } + }); + } /** * Returns the position in the data file to find the given key, or -1 if the @@ -193,7 +345,71 @@ public abstract class SSTableReader exte * FIXME: should not be public: use Scanner. */ @Deprecated - public abstract long getPosition(DecoratedKey decoratedKey) throws IOException; + public long getPosition(DecoratedKey decoratedKey) + { + // first, check bloom filter + if (!bf.isPresent(partitioner.convertToDiskFormat(decoratedKey))) + return -1; + + // next, the key cache + Pair<Descriptor, DecoratedKey> unifiedKey = new Pair<Descriptor, DecoratedKey>(desc, decoratedKey); + if (keyCache != null && keyCache.getCapacity() > 0) + { + Long cachedPosition = keyCache.get(unifiedKey); + if (cachedPosition != null) + { + return cachedPosition; + } + } + + // next, see if the sampled index says it's impossible for the key to be present + IndexSummary.KeyPosition sampledPosition = getIndexScanPosition(decoratedKey); + if (sampledPosition == null) + return -1; + + // scan the on-disk index, starting at the nearest sampled position + int i = 0; + Iterator<FileDataInput> segments = ifile.iterator(sampledPosition.indexPosition, INDEX_FILE_BUFFER_BYTES); + while (segments.hasNext()) + { + FileDataInput input = segments.next(); + try + { + while (!input.isEOF() && i++ < IndexSummary.INDEX_INTERVAL) + { + // read key & data position from index entry + DecoratedKey indexDecoratedKey = partitioner.convertFromDiskFormat(FBUtilities.readShortByteArray(input)); + long dataPosition = input.readLong(); + + int v = indexDecoratedKey.compareTo(decoratedKey); + if (v == 0) + { + if (keyCache != null && keyCache.getCapacity() > 0) + keyCache.put(unifiedKey, Long.valueOf(dataPosition)); + return dataPosition; + } + if (v > 0) + return -1; + } + } + catch (IOException e) + { + throw new IOError(e); + } + finally + { + try + { + input.close(); + } + catch (IOException e) + { + logger.error("error closing file", e); + } + } + } + return -1; + } /** * Like getPosition, but if key is not found will return the location of the @@ -201,12 +417,54 @@ public abstract class SSTableReader exte * FIXME: should not be public: use Scanner. */ @Deprecated - public abstract long getNearestPosition(DecoratedKey decoratedKey) throws IOException; + public long getNearestPosition(DecoratedKey decoratedKey) + { + IndexSummary.KeyPosition sampledPosition = getIndexScanPosition(decoratedKey); + if (sampledPosition == null) + return 0; + + // scan the on-disk index, starting at the nearest sampled position + Iterator<FileDataInput> segiter = ifile.iterator(sampledPosition.indexPosition, INDEX_FILE_BUFFER_BYTES); + while (segiter.hasNext()) + { + FileDataInput input = segiter.next(); + try + { + while (!input.isEOF()) + { + DecoratedKey indexDecoratedKey = partitioner.convertFromDiskFormat(FBUtilities.readShortByteArray(input)); + long position = input.readLong(); + int v = indexDecoratedKey.compareTo(decoratedKey); + if (v >= 0) + return position; + } + } + catch (IOException e) + { + throw new IOError(e); + } + finally + { + try + { + input.close(); + } + catch (IOException e) + { + logger.error("error closing file", e); + } + } + } + return -1; + } /** * @return The length in bytes of the data file for this SSTable. */ - public abstract long length(); + public long length() + { + return dfile.length; + } public void markCompacted() { @@ -228,16 +486,35 @@ public abstract class SSTableReader exte * @param bufferSize Buffer size in bytes for this Scanner. * @return A Scanner for seeking over the rows of the SSTable. */ - public abstract SSTableScanner getScanner(int bufferSize); + public SSTableScanner getScanner(int bufferSize) + { + return new SSTableScanner(this, bufferSize); + } /** * @param bufferSize Buffer size in bytes for this Scanner. * @param filter filter to use when reading the columns * @return A Scanner for seeking over the rows of the SSTable. */ - public abstract SSTableScanner getScanner(int bufferSize, QueryFilter filter); - - public abstract FileDataInput getFileDataInput(DecoratedKey decoratedKey, int bufferSize); + public SSTableScanner getScanner(int bufferSize, QueryFilter filter) + { + return new SSTableScanner(this, filter, bufferSize); + } + + public FileDataInput getFileDataInput(DecoratedKey decoratedKey, int bufferSize) + { + long position = getPosition(decoratedKey); + if (position < 0) + return null; + + return dfile.getSegment(position, bufferSize); + } + + + public int compareTo(SSTableReader o) + { + return desc.generation - o.desc.generation; + } public AbstractType getColumnComparator() { Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java?rev=954875&r1=954874&r2=954875&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java Tue Jun 15 13:29:07 2010 @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,17 +20,177 @@ package org.apache.cassandra.io.sstable; import java.io.Closeable; +import java.io.IOException; +import java.io.IOError; import java.util.Iterator; +import java.util.Arrays; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.filter.IColumnIterator; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.io.util.BufferedRandomAccessFile; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public abstract class SSTableScanner implements Iterator<IColumnIterator>, Closeable -{ - public abstract void seekTo(DecoratedKey seekKey); - public abstract long getFileLength(); +public class SSTableScanner implements Iterator<IColumnIterator>, Closeable +{ + private static Logger logger = LoggerFactory.getLogger(SSTableScanner.class); - public abstract long getFilePointer(); + private final BufferedRandomAccessFile file; + private final SSTableReader sstable; + private IColumnIterator row; + private boolean exhausted = false; + private Iterator<IColumnIterator> iterator; + private QueryFilter filter; + + /** + * @param sstable SSTable to scan. + */ + SSTableScanner(SSTableReader sstable, int bufferSize) + { + try + { + this.file = new BufferedRandomAccessFile(sstable.getFilename(), "r", bufferSize); + } + catch (IOException e) + { + throw new IOError(e); + } + this.sstable = sstable; + } + + /** + * @param sstable SSTable to scan. + * @param filter filter to use when scanning the columns + */ + SSTableScanner(SSTableReader sstable, QueryFilter filter, int bufferSize) + { + try + { + this.file = new BufferedRandomAccessFile(sstable.getFilename(), "r", bufferSize); + } + catch (IOException e) + { + throw new IOError(e); + } + this.sstable = sstable; + this.filter = filter; + } + + public void close() throws IOException + { + file.close(); + } + + public void seekTo(DecoratedKey seekKey) + { + try + { + long position = sstable.getNearestPosition(seekKey); + if (position < 0) + { + exhausted = true; + return; + } + file.seek(position); + row = null; + } + catch (IOException e) + { + throw new RuntimeException("corrupt sstable", e); + } + } + + public long getFileLength() + { + try + { + return file.length(); + } + catch (IOException e) + { + throw new IOError(e); + } + } + + public long getFilePointer() + { + return file.getFilePointer(); + } + + public boolean hasNext() + { + if (iterator == null) + iterator = exhausted ? Arrays.asList(new IColumnIterator[0]).iterator() : new KeyScanningIterator(); + return iterator.hasNext(); + } + + public IColumnIterator next() + { + if (iterator == null) + iterator = exhausted ? Arrays.asList(new IColumnIterator[0]).iterator() : new KeyScanningIterator(); + return iterator.next(); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + + private class KeyScanningIterator implements Iterator<IColumnIterator> + { + private long dataStart; + private long finishedAt; + + public boolean hasNext() + { + try + { + if (row == null) + return !file.isEOF(); + return finishedAt < file.length(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + public IColumnIterator next() + { + try + { + if (row != null) + file.seek(finishedAt); + assert !file.isEOF(); + + DecoratedKey key = StorageService.getPartitioner().convertFromDiskFormat(FBUtilities.readShortByteArray(file)); + int dataSize = file.readInt(); + dataStart = file.getFilePointer(); + finishedAt = dataStart + dataSize; + + if (filter == null) + { + return row = new SSTableIdentityIterator(sstable, file, key, dataStart, finishedAt); + } + else + { + return row = filter.getSSTableColumnIterator(sstable, file, key, dataStart); + } + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=954875&r1=954874&r2=954875&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Tue Jun 15 13:29:07 2010 @@ -163,7 +163,7 @@ public class SSTableWriter extends SSTab SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_DATA)); ibuilder = null; dbuilder = null; - return RowIndexedReader.internalOpen(newdesc, partitioner, ifile, dfile, indexSummary, bf, maxDataAge); + return SSTableReader.internalOpen(newdesc, partitioner, ifile, dfile, indexSummary, bf, maxDataAge); } static Descriptor rename(Descriptor tmpdesc)