http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java index d69eb16..22cce77 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Iterator; import org.apache.cassandra.db.columniterator.IColumnIterator; +import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.utils.Pair; @@ -44,7 +45,7 @@ public class SSTableBoundedScanner extends SSTableScanner currentRange = rangeIterator.next(); try { - file.seek(currentRange.left); + dfile.seek(currentRange.left); } catch (IOException e) { @@ -58,6 +59,16 @@ public class SSTableBoundedScanner extends SSTableScanner } } + /* + * This shouldn't be used with a bounded scanner as it could put the + * bounded scanner outside it's range. + */ + @Override + public void seekTo(RowPosition seekKey) + { + throw new UnsupportedOperationException(); + } + @Override public boolean hasNext() {
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java index 695e966..f5895a2 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java @@ -110,7 +110,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat if (dataStart + dataSize > file.length()) throw new IOException(String.format("dataSize of %s starting at %s would be larger than file %s length %s", dataSize, dataStart, file.getPath(), file.length())); - if (checkData) + if (checkData && !sstable.descriptor.hasPromotedIndexes) { try { @@ -137,8 +137,11 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat } } - IndexHelper.skipBloomFilter(inputWithTracker); - IndexHelper.skipIndex(inputWithTracker); + if (!sstable.descriptor.hasPromotedIndexes) + { + IndexHelper.skipBloomFilter(inputWithTracker); + IndexHelper.skipIndex(inputWithTracker); + } columnFamily = ColumnFamily.create(metadata); ColumnFamily.serializer().deserializeFromSSTableNoColumns(columnFamily, inputWithTracker); columnCount = inputWithTracker.readInt(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 072b6f7..a41d2a7 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -82,7 +82,7 @@ public class SSTableReader extends SSTable private IndexSummary indexSummary; private Filter bf; - private InstrumentingCache<KeyCacheKey, Long> keyCache; + private InstrumentingCache<KeyCacheKey, RowIndexEntry> keyCache; private final BloomFilterTracker bloomFilterTracker = new BloomFilterTracker(); @@ -353,7 +353,7 @@ public class SSTableReader extends SSTable long histogramCount = sstableMetadata.estimatedRowSize.count(); long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed() ? histogramCount - : SSTable.estimateRowsFromIndex(input); // statistics is supposed to be optional + : estimateRowsFromIndex(input); // statistics is supposed to be optional indexSummary = new IndexSummary(estimatedKeys); if (recreatebloom) bf = LegacyBloomFilter.getFilter(estimatedKeys, 15); @@ -377,7 +377,7 @@ public class SSTableReader extends SSTable left = decodeKey(partitioner, descriptor, skippedKey); right = decodeKey(partitioner, descriptor, skippedKey); - long dataPosition = input.readLong(); + RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(input, descriptor); if (key != null) { DecoratedKey decoratedKey = decodeKey(partitioner, descriptor, key); @@ -387,12 +387,12 @@ public class SSTableReader extends SSTable indexSummary.addEntry(decoratedKey, indexPosition); // if key cache could be used and we have key already pre-loaded if (cacheLoading && keysToLoadInCache.contains(decoratedKey)) - cacheKey(decoratedKey, dataPosition); + cacheKey(decoratedKey, indexEntry); } indexSummary.incrementRowid(); ibuilder.addPotentialBoundary(indexPosition); - dbuilder.addPotentialBoundary(dataPosition); + dbuilder.addPotentialBoundary(indexEntry.position); } indexSummary.complete(); } @@ -409,7 +409,7 @@ public class SSTableReader extends SSTable } /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */ - private long getIndexScanPosition(RowPosition key) + public long getIndexScanPosition(RowPosition key) { assert indexSummary.getKeys() != null && indexSummary.getKeys().size() > 0; int index = Collections.binarySearch(indexSummary.getKeys(), key); @@ -597,11 +597,13 @@ public class SSTableReader extends SSTable for (Range<Token> range : Range.normalize(ranges)) { AbstractBounds<RowPosition> keyRange = range.toRowBounds(); - long left = getPosition(keyRange.left, Operator.GT); + RowIndexEntry idxLeft = getPosition(keyRange.left, Operator.GT); + long left = idxLeft == null ? -1 : idxLeft.position; if (left == -1) // left is past the end of the file continue; - long right = getPosition(keyRange.right, Operator.GT); + RowIndexEntry idxRight = getPosition(keyRange.right, Operator.GT); + long right = idxRight == null ? -1 : idxRight.position; if (right == -1 || Range.isWrapAround(range.left, range.right)) // right is past the end of the file, or it wraps right = uncompressedLength(); @@ -613,7 +615,7 @@ public class SSTableReader extends SSTable return positions; } - public void cacheKey(DecoratedKey key, Long info) + public void cacheKey(DecoratedKey key, RowIndexEntry info) { CFMetaData.Caching caching = metadata.getCaching(); @@ -627,12 +629,12 @@ public class SSTableReader extends SSTable keyCache.put(new KeyCacheKey(descriptor, ByteBufferUtil.clone(key.key)), info); } - public Long getCachedPosition(DecoratedKey key, boolean updateStats) + public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats) { return getCachedPosition(new KeyCacheKey(descriptor, key.key), updateStats); } - private Long getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats) + private RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats) { if (keyCache != null && keyCache.getCapacity() > 0) return updateStats ? keyCache.get(unifiedKey) : keyCache.getInternal(unifiedKey); @@ -643,23 +645,23 @@ public class SSTableReader extends SSTable * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to * allow key selection by token bounds but only if op != * EQ * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins. - * @return The position in the data file to find the key, or -1 if the key is not present + * @return The index entry corresponding to the key, or null if the key is not present */ - public long getPosition(RowPosition key, Operator op) + public RowIndexEntry getPosition(RowPosition key, Operator op) { // first, check bloom filter if (op == Operator.EQ) { assert key instanceof DecoratedKey; // EQ only make sense if the key is a valid row key if (!bf.isPresent(((DecoratedKey)key).key)) - return -1; + return null; } // next, the key cache (only make sense for valid row key) if ((op == Operator.EQ || op == Operator.GE) && (key instanceof DecoratedKey)) { DecoratedKey decoratedKey = (DecoratedKey)key; - Long cachedPosition = getCachedPosition(new KeyCacheKey(descriptor, decoratedKey.key), true); + RowIndexEntry cachedPosition = getCachedPosition(new KeyCacheKey(descriptor, decoratedKey.key), true); if (cachedPosition != null) return cachedPosition; } @@ -670,8 +672,12 @@ public class SSTableReader extends SSTable { if (op == Operator.EQ) bloomFilterTracker.addFalsePositive(); - // we matched the -1th position: if the operator might match forward, return the 0th position - return op.apply(1) >= 0 ? 0 : -1; + // we matched the -1th position: if the operator might match forward, we'll start at the first + // position. We however need to return the correct index entry for that first position. + if (op.apply(1) >= 0) + sampledPosition = 0; + else + return null; } // scan the on-disk index, starting at the nearest sampled position @@ -685,29 +691,29 @@ public class SSTableReader extends SSTable { // read key & data position from index entry DecoratedKey indexDecoratedKey = decodeKey(partitioner, descriptor, ByteBufferUtil.readWithShortLength(input)); - long dataPosition = input.readLong(); - int comparison = indexDecoratedKey.compareTo(key); int v = op.apply(comparison); if (v == 0) { + RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(input, descriptor); if (comparison == 0 && keyCache != null && keyCache.getCapacity() > 0) { assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key DecoratedKey decoratedKey = (DecoratedKey)key; // store exact match for the key - cacheKey(decoratedKey, dataPosition); + cacheKey(decoratedKey, indexEntry); } if (op == Operator.EQ) bloomFilterTracker.addTruePositive(); - return dataPosition; + return indexEntry; } if (v < 0) { if (op == Operator.EQ) bloomFilterTracker.addFalsePositive(); - return -1; + return null; } + RowIndexEntry.serializer.skip(input, descriptor); } } catch (IOException e) @@ -723,7 +729,7 @@ public class SSTableReader extends SSTable if (op == Operator.EQ) bloomFilterTracker.addFalsePositive(); - return -1; + return null; } /** @@ -842,12 +848,8 @@ public class SSTableReader extends SSTable return new SSTableBoundedScanner(this, true, range); } - public FileDataInput getFileDataInput(DecoratedKey decoratedKey, int bufferSize) + public FileDataInput getFileDataInput(long position) { - long position = getPosition(decoratedKey, Operator.EQ); - if (position < 0) - return null; - return dfile.getSegment(position); } @@ -889,6 +891,11 @@ public class SSTableReader extends SSTable return p.decorateKey(bytes); } + public DecoratedKey decodeKey(ByteBuffer bytes) + { + return decodeKey(partitioner, descriptor, bytes); + } + /** * TODO: Move someplace reusable */ @@ -940,7 +947,7 @@ public class SSTableReader extends SSTable return bloomFilterTracker.getRecentTruePositiveCount(); } - public InstrumentingCache<KeyCacheKey, Long> getKeyCache() + public InstrumentingCache<KeyCacheKey, RowIndexEntry> getKeyCache() { return keyCache; } @@ -982,6 +989,11 @@ public class SSTableReader extends SSTable : RandomAccessReader.open(new File(getFilename()), skipIOCache); } + public RandomAccessReader openIndexReader(boolean skipIOCache) throws IOException + { + return RandomAccessReader.open(new File(getIndexFilename()), skipIOCache); + } + /** * @param sstables * @return true if all desired references were acquired. Otherwise, it will unreference any partial acquisition, and return false. http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java index 8f03cc1..902f926 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java @@ -26,10 +26,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.db.columniterator.IColumnIterator; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.CloseableIterator; @@ -37,12 +39,13 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> { private static final Logger logger = LoggerFactory.getLogger(SSTableScanner.class); - protected final RandomAccessReader file; + protected final RandomAccessReader dfile; + protected final RandomAccessReader ifile; public final SSTableReader sstable; private IColumnIterator row; protected boolean exhausted = false; protected Iterator<IColumnIterator> iterator; - private QueryFilter filter; + private final QueryFilter filter; /** * @param sstable SSTable to scan. @@ -51,7 +54,8 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> { try { - this.file = sstable.openDataReader(skipCache); + this.dfile = sstable.openDataReader(skipCache); + this.ifile = sstable.openIndexReader(skipCache); } catch (IOException e) { @@ -59,6 +63,7 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> throw new IOError(e); } this.sstable = sstable; + this.filter = null; } /** @@ -69,7 +74,8 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> { try { - this.file = sstable.openDataReader(false); + this.dfile = sstable.openDataReader(false); + this.ifile = sstable.openIndexReader(false); } catch (IOException e) { @@ -82,21 +88,40 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> public void close() throws IOException { - file.close(); + FileUtils.close(dfile, ifile); } public void seekTo(RowPosition seekKey) { try { - long position = sstable.getPosition(seekKey, SSTableReader.Operator.GE); - if (position < 0) + long indexPosition = sstable.getIndexScanPosition(seekKey); + // -1 means the key is before everything in the sstable. So just start from the beginning. + if (indexPosition == -1) + indexPosition = 0; + + ifile.seek(indexPosition); + + while (!ifile.isEOF()) { - exhausted = true; - return; + long startPosition = ifile.getFilePointer(); + DecoratedKey indexDecoratedKey = sstable.decodeKey(ByteBufferUtil.readWithShortLength(ifile)); + int comparison = indexDecoratedKey.compareTo(seekKey); + if (comparison >= 0) + { + // Found, just read the dataPosition and seek into index and data files + long dataPosition = ifile.readLong(); + ifile.seek(startPosition); + dfile.seek(dataPosition); + row = null; + return; + } + else + { + RowIndexEntry.serializer.skip(ifile, sstable.descriptor); + } } - file.seek(position); - row = null; + exhausted = true; } catch (IOException e) { @@ -109,7 +134,7 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> { try { - return file.length(); + return dfile.length(); } catch (IOException e) { @@ -119,20 +144,20 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> public long getFilePointer() { - return file.getFilePointer(); + return dfile.getFilePointer(); } public boolean hasNext() { if (iterator == null) - iterator = exhausted ? Arrays.asList(new IColumnIterator[0]).iterator() : new KeyScanningIterator(); + iterator = exhausted ? Arrays.asList(new IColumnIterator[0]).iterator() : createIterator(); return iterator.hasNext(); } public IColumnIterator next() { if (iterator == null) - iterator = exhausted ? Arrays.asList(new IColumnIterator[0]).iterator() : new KeyScanningIterator(); + iterator = exhausted ? Arrays.asList(new IColumnIterator[0]).iterator() : createIterator(); return iterator.next(); } @@ -141,6 +166,11 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> throw new UnsupportedOperationException(); } + private Iterator<IColumnIterator> createIterator() + { + return filter == null ? new KeyScanningIterator() : new FilteredKeyScanningIterator(); + } + protected class KeyScanningIterator implements Iterator<IColumnIterator> { protected long finishedAt; @@ -150,8 +180,8 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> try { if (row == null) - return !file.isEOF(); - return finishedAt < file.length(); + return !dfile.isEOF(); + return finishedAt < dfile.length(); } catch (IOException e) { @@ -165,25 +195,88 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> try { if (row != null) - file.seek(finishedAt); - assert !file.isEOF(); - - DecoratedKey key = SSTableReader.decodeKey(sstable.partitioner, - sstable.descriptor, - ByteBufferUtil.readWithShortLength(file)); - long dataSize = SSTableReader.readRowSize(file, sstable.descriptor); - long dataStart = file.getFilePointer(); + dfile.seek(finishedAt); + assert !dfile.isEOF(); + + // Read data header + DecoratedKey key = sstable.decodeKey(ByteBufferUtil.readWithShortLength(dfile)); + long dataSize = SSTableReader.readRowSize(dfile, sstable.descriptor); + long dataStart = dfile.getFilePointer(); finishedAt = dataStart + dataSize; - if (filter == null) + row = new SSTableIdentityIterator(sstable, dfile, key, dataStart, dataSize); + return row; + } + catch (IOException e) + { + sstable.markSuspect(); + throw new RuntimeException(SSTableScanner.this + " failed to provide next columns from " + this, e); + } + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "(" + "finishedAt:" + finishedAt + ")"; + } + } + + protected class FilteredKeyScanningIterator implements Iterator<IColumnIterator> + { + protected DecoratedKey nextKey; + protected RowIndexEntry nextEntry; + + public boolean hasNext() + { + try + { + if (row == null) + return !ifile.isEOF(); + return nextKey != null; + } + catch (IOException e) + { + sstable.markSuspect(); + throw new RuntimeException(e); + } + } + + public IColumnIterator next() + { + try + { + DecoratedKey currentKey; + RowIndexEntry currentEntry; + + if (row == null) { - row = new SSTableIdentityIterator(sstable, file, key, dataStart, dataSize); - return row; + currentKey = sstable.decodeKey(ByteBufferUtil.readWithShortLength(ifile)); + currentEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor); } else { - return row = filter.getSSTableColumnIterator(sstable, file, key); + currentKey = nextKey; + currentEntry = nextEntry; } + + if (ifile.isEOF()) + { + nextKey = null; + nextEntry = null; + } + else + { + nextKey = sstable.decodeKey(ByteBufferUtil.readWithShortLength(ifile)); + nextEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor); + } + + assert !dfile.isEOF(); + return row = filter.getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry); } catch (IOException e) { @@ -196,19 +289,14 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> { throw new UnsupportedOperationException(); } - - @Override - public String toString() { - return getClass().getSimpleName() + "(" + - "finishedAt:" + finishedAt + - ")"; } -} @Override - public String toString() { + public String toString() + { return getClass().getSimpleName() + "(" + - "file=" + file + + "dfile=" + dfile + + " ifile=" + ifile + " sstable=" + sstable + " exhausted=" + exhausted + ")"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index e07e151..9225b9e 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -137,7 +137,7 @@ public class SSTableWriter extends SSTable return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer(); } - private void afterAppend(DecoratedKey decoratedKey, long dataPosition) throws IOException + private RowIndexEntry afterAppend(DecoratedKey decoratedKey, long dataPosition, DeletionInfo delInfo, ColumnIndex index) throws IOException { lastWrittenKey = decoratedKey; this.last = lastWrittenKey; @@ -146,11 +146,13 @@ public class SSTableWriter extends SSTable if (logger.isTraceEnabled()) logger.trace("wrote " + decoratedKey + " at " + dataPosition); - iwriter.afterAppend(decoratedKey, dataPosition); + RowIndexEntry entry = RowIndexEntry.create(dataPosition, delInfo, index); + iwriter.append(decoratedKey, entry); dbuilder.addPotentialBoundary(dataPosition); + return entry; } - public long append(AbstractCompactedRow row) throws IOException + public RowIndexEntry append(AbstractCompactedRow row) throws IOException { long currentPosition = beforeAppend(row.key); ByteBufferUtil.writeWithShortLength(row.key.key, dataFile.stream); @@ -158,10 +160,8 @@ public class SSTableWriter extends SSTable long dataSize = row.write(dataFile.stream); assert dataSize == dataFile.getFilePointer() - (dataStart + 8) : "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8)); - sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats()); - afterAppend(row.key, currentPosition); - return currentPosition; + return afterAppend(row.key, currentPosition, row.deletionInfo(), row.index()); } public void append(DecoratedKey decoratedKey, ColumnFamily cf) throws IOException @@ -169,29 +169,19 @@ public class SSTableWriter extends SSTable long startPosition = beforeAppend(decoratedKey); ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile.stream); - // serialize index and bloom filter into in-memory structure - ColumnIndexer.RowHeader header = ColumnIndexer.serialize(cf); + // build column index + // TODO: build and serialization could be merged + ColumnIndex index = new ColumnIndex.Builder(cf.getComparator(), decoratedKey.key, cf.getColumnCount()).build(cf); - // write out row size - dataFile.stream.writeLong(header.serializedSize() + cf.serializedSizeForSSTable()); + // write out row size + data + dataFile.stream.writeLong(cf.serializedSizeForSSTable()); + ColumnFamily.serializer().serializeForSSTable(cf, dataFile.stream); - // write out row header and data - ColumnFamily.serializer().serializeWithIndexes(cf, header, dataFile.stream); - afterAppend(decoratedKey, startPosition); + afterAppend(decoratedKey, startPosition, cf.deletionInfo(), index); sstableMetadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats()); } - public void append(DecoratedKey decoratedKey, ByteBuffer value) throws IOException - { - long currentPosition = beforeAppend(decoratedKey); - ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile.stream); - assert value.remaining() > 0; - dataFile.stream.writeLong(value.remaining()); - ByteBufferUtil.write(value, dataFile.stream); - afterAppend(decoratedKey, currentPosition); - } - public long appendFromStream(DecoratedKey key, CFMetaData metadata, long dataSize, DataInput in) throws IOException { long currentPosition = beforeAppend(key); @@ -201,21 +191,12 @@ public class SSTableWriter extends SSTable // write row size dataFile.stream.writeLong(dataSize); - // write BF - int bfSize = in.readInt(); - dataFile.stream.writeInt(bfSize); - for (int i = 0; i < bfSize; i++) - dataFile.stream.writeByte(in.readByte()); - - // write index - int indexSize = in.readInt(); - dataFile.stream.writeInt(indexSize); - for (int i = 0; i < indexSize; i++) - dataFile.stream.writeByte(in.readByte()); - // cf data - dataFile.stream.writeInt(in.readInt()); - dataFile.stream.writeLong(in.readLong()); + int lct = in.readInt(); + long mfda = in.readLong(); + DeletionInfo deletionInfo = new DeletionInfo(mfda, lct); + dataFile.stream.writeInt(lct); + dataFile.stream.writeLong(mfda); // column size int columnCount = in.readInt(); @@ -225,6 +206,7 @@ public class SSTableWriter extends SSTable long maxTimestamp = Long.MIN_VALUE; StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE); ColumnFamily cf = ColumnFamily.create(metadata, ArrayBackedSortedColumns.factory()); + ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf.getComparator(), key.key, columnCount); for (int i = 0; i < columnCount; i++) { // deserialize column with PRESERVE_SIZE because we've written the dataSize based on the @@ -254,6 +236,7 @@ public class SSTableWriter extends SSTable } maxTimestamp = Math.max(maxTimestamp, column.maxTimestamp()); cf.getColumnSerializer().serialize(column, dataFile.stream); + columnIndexer.add(column); } assert dataSize == dataFile.getFilePointer() - (dataStart + 8) @@ -262,7 +245,7 @@ public class SSTableWriter extends SSTable sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition); sstableMetadataCollector.addColumnCount(columnCount); sstableMetadataCollector.mergeTombstoneHistogram(tombstones); - afterAppend(key, currentPosition); + afterAppend(key, currentPosition, deletionInfo, columnIndexer.build()); return currentPosition; } @@ -401,14 +384,14 @@ public class SSTableWriter extends SSTable : BloomFilter.getFilter(keyCount, fpChance); } - public void afterAppend(DecoratedKey key, long dataPosition) throws IOException + public void append(DecoratedKey key, RowIndexEntry indexEntry) throws IOException { bf.add(key.key); long indexPosition = indexFile.getFilePointer(); ByteBufferUtil.writeWithShortLength(key.key, indexFile.stream); - indexFile.stream.writeLong(dataPosition); + RowIndexEntry.serializer.serialize(indexEntry, indexFile.stream); if (logger.isTraceEnabled()) - logger.trace("wrote index of " + key + " at " + indexPosition); + logger.trace("wrote index entry: " + indexEntry + " at " + indexPosition); summary.maybeAddEntry(key, indexPosition); builder.addPotentialBoundary(indexPosition); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/util/FileDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileDataInput.java b/src/java/org/apache/cassandra/io/util/FileDataInput.java index 200dfa7..d94075c 100644 --- a/src/java/org/apache/cassandra/io/util/FileDataInput.java +++ b/src/java/org/apache/cassandra/io/util/FileDataInput.java @@ -30,12 +30,16 @@ public interface FileDataInput extends DataInput, Closeable public long bytesRemaining() throws IOException; + public void seek(long pos) throws IOException; + public FileMark mark(); public void reset(FileMark mark) throws IOException; public long bytesPastMark(FileMark mark); + public long getFilePointer(); + /** * Read length bytes from current file position * @param length length of the bytes to read http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java index 9b89e39..1687379 100644 --- a/src/java/org/apache/cassandra/io/util/FileUtils.java +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java @@ -19,6 +19,7 @@ package org.apache.cassandra.io.util; import java.io.*; import java.text.DecimalFormat; +import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -98,6 +99,11 @@ public class FileUtils } } + public static void close(Closeable... cs) throws IOException + { + close(Arrays.asList(cs)); + } + public static void close(Iterable<? extends Closeable> cs) throws IOException { IOException e = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java index b3e7a23..786d312 100644 --- a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java +++ b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java @@ -28,21 +28,24 @@ public class MappedFileDataInput extends AbstractDataInput implements FileDataIn { private final MappedByteBuffer buffer; private final String filename; + private final long segmentOffset; private int position; - public MappedFileDataInput(FileInputStream stream, String filename, int position) throws IOException + public MappedFileDataInput(FileInputStream stream, String filename, long segmentOffset, int position) throws IOException { FileChannel channel = stream.getChannel(); buffer = channel.map(FileChannel.MapMode.READ_ONLY, position, channel.size()); this.filename = filename; + this.segmentOffset = segmentOffset; this.position = position; } - public MappedFileDataInput(MappedByteBuffer buffer, String filename, int position) + public MappedFileDataInput(MappedByteBuffer buffer, String filename, long segmentOffset, int position) { assert buffer != null; this.buffer = buffer; this.filename = filename; + this.segmentOffset = segmentOffset; this.position = position; } @@ -52,6 +55,22 @@ public class MappedFileDataInput extends AbstractDataInput implements FileDataIn position = pos; } + // Only use when we know the seek in within the mapped segment. Throws an + // IOException otherwise. + public void seek(long pos) throws IOException + { + long inSegmentPos = pos - segmentOffset; + if (inSegmentPos < 0 || inSegmentPos > buffer.capacity()) + throw new IOException(String.format("Seek position %d is not within mmap segment (seg offs: %d, length: %d)", pos, segmentOffset, buffer.capacity())); + + seekInternal((int) inSegmentPos); + } + + public long getFilePointer() + { + return segmentOffset + (long)position; + } + protected int getPosition() { return position; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java index 03b361e..3803963 100644 --- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java @@ -77,7 +77,7 @@ public class MmappedSegmentedFile extends SegmentedFile if (segment.right != null) { // segment is mmap'd - return new MappedFileDataInput(segment.right, path, (int) (position - segment.left)); + return new MappedFileDataInput(segment.right, path, segment.left, (int) (position - segment.left)); } // not mmap'd: open a braf covering the segment http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index 05fb51f..88fe1a0 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -29,6 +29,7 @@ import javax.management.ObjectName; import org.apache.cassandra.cache.*; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.utils.FBUtilities; import org.slf4j.Logger; @@ -61,7 +62,7 @@ public class CacheService implements CacheServiceMBean public final static CacheService instance = new CacheService(); - public final AutoSavingCache<KeyCacheKey, Long> keyCache; + public final AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache; public final AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache; private int rowCacheSavePeriod; @@ -91,7 +92,7 @@ public class CacheService implements CacheServiceMBean * We can use Weighers.singleton() because Long can't be leaking memory * @return auto saving cache object */ - private AutoSavingCache<KeyCacheKey, Long> initKeyCache() + private AutoSavingCache<KeyCacheKey, RowIndexEntry> initKeyCache() { logger.info("Initializing key cache with capacity of {} MBs.", DatabaseDescriptor.getKeyCacheSizeInMB()); @@ -99,8 +100,8 @@ public class CacheService implements CacheServiceMBean // as values are constant size we can use singleton weigher // where 48 = 40 bytes (average size of the key) + 8 bytes (size of value) - ICache<KeyCacheKey, Long> kc = ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity / AVERAGE_KEY_CACHE_ROW_SIZE); - AutoSavingCache<KeyCacheKey, Long> keyCache = new AutoSavingCache<KeyCacheKey, Long>(kc, CacheType.KEY_CACHE); + ICache<KeyCacheKey, RowIndexEntry> kc = ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity / AVERAGE_KEY_CACHE_ROW_SIZE); + AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = new AutoSavingCache<KeyCacheKey, RowIndexEntry>(kc, CacheType.KEY_CACHE); int keyCacheKeysToSave = DatabaseDescriptor.getKeyCacheKeysToSave(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/utils/StatusLogger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java index b547bf2..8225863 100644 --- a/src/java/org/apache/cassandra/utils/StatusLogger.java +++ b/src/java/org/apache/cassandra/utils/StatusLogger.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.CacheService; @@ -86,7 +87,7 @@ public class StatusLogger "MessagingService", "n/a", pendingCommands + "," + pendingResponses)); // Global key/row cache information - AutoSavingCache<KeyCacheKey, Long> keyCache = CacheService.instance.keyCache; + AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = CacheService.instance.keyCache; AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = CacheService.instance.rowCache; int keyCacheKeysToSave = DatabaseDescriptor.getKeyCacheKeysToSave(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index 0a376b4..1f6a799 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -20,8 +20,7 @@ package org.apache.cassandra; * */ -import java.io.EOFException; -import java.io.IOException; +import java.io.*; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; @@ -56,6 +55,11 @@ public class Util return StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(key)); } + public static DecoratedKey dk(ByteBuffer key) + { + return StorageService.getPartitioner().decorateKey(key); + } + public static RowPosition rp(String key) { return rp(key, StorageService.getPartitioner()); @@ -256,4 +260,12 @@ public class Util assert thrown : exception.getName() + " not received"; } + + public static ByteBuffer serializeForSSTable(ColumnFamily cf) + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + cf.serializer().serializeForSSTable(cf, dos); + return ByteBuffer.wrap(baos.toByteArray()); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/test/unit/org/apache/cassandra/db/KeyCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java index a47e2ea..01add3f 100644 --- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java +++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java @@ -72,7 +72,7 @@ public class KeyCacheTest extends CleanupHelper assertEquals(100, CacheService.instance.keyCache.size()); // really? our caches don't implement the map interface? (hence no .addAll) - Map<KeyCacheKey, Long> savedMap = new HashMap<KeyCacheKey, Long>(); + Map<KeyCacheKey, RowIndexEntry> savedMap = new HashMap<KeyCacheKey, RowIndexEntry>(); for (KeyCacheKey k : CacheService.instance.keyCache.getKeySet()) { savedMap.put(k, CacheService.instance.keyCache.get(k)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/test/unit/org/apache/cassandra/db/TableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/TableTest.java b/test/unit/org/apache/cassandra/db/TableTest.java index b5a3869..f3e090e 100644 --- a/test/unit/org/apache/cassandra/db/TableTest.java +++ b/test/unit/org/apache/cassandra/db/TableTest.java @@ -404,14 +404,8 @@ public class TableTest extends CleanupHelper } // verify that we do indeed have multiple index entries SSTableReader sstable = cfStore.getSSTables().iterator().next(); - long position = sstable.getPosition(key, SSTableReader.Operator.EQ); - RandomAccessReader file = sstable.openDataReader(false); - file.seek(position); - assert ByteBufferUtil.readWithShortLength(file).equals(key.key); - SSTableReader.readRowSize(file, sstable.descriptor); - IndexHelper.skipBloomFilter(file); - ArrayList<IndexHelper.IndexInfo> indexes = IndexHelper.deserializeIndex(file); - assert indexes.size() > 2; + RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ); + assert indexEntry.columnsIndex().size() > 2; validateSliceLarge(cfStore); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java index 1704248..ef2adfa 100644 --- a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java +++ b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java @@ -108,8 +108,8 @@ public class LazilyCompactedRowTest extends CleanupHelper new FileOutputStream(tmpFile1).write(out1.getData()); // writing data from row1 new FileOutputStream(tmpFile2).write(out2.getData()); // writing data from row2 - MappedFileDataInput in1 = new MappedFileDataInput(new FileInputStream(tmpFile1), tmpFile1.getAbsolutePath(), 0); - MappedFileDataInput in2 = new MappedFileDataInput(new FileInputStream(tmpFile2), tmpFile2.getAbsolutePath(), 0); + MappedFileDataInput in1 = new MappedFileDataInput(new FileInputStream(tmpFile1), tmpFile1.getAbsolutePath(), 0, 0); + MappedFileDataInput in2 = new MappedFileDataInput(new FileInputStream(tmpFile2), tmpFile2.getAbsolutePath(), 0, 0); // key isn't part of what CompactedRow writes, that's done by SSTW.append @@ -118,18 +118,6 @@ public class LazilyCompactedRowTest extends CleanupHelper long rowSize2 = SSTableReader.readRowSize(in2, sstables.iterator().next().descriptor); assertEquals(rowSize1 + 8, out1.getLength()); assertEquals(rowSize2 + 8, out2.getLength()); - // bloom filter - IndexHelper.defreezeBloomFilter(in1, rowSize1, false); - IndexHelper.defreezeBloomFilter(in2, rowSize2, false); - // index - int indexSize1 = in1.readInt(); - int indexSize2 = in2.readInt(); - assertEquals(indexSize1, indexSize2); - - ByteBuffer bytes1 = in1.readBytes(indexSize1); - ByteBuffer bytes2 = in2.readBytes(indexSize2); - - assert bytes1.equals(bytes2); // cf metadata ColumnFamily cf1 = ColumnFamily.create(cfs.metadata); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index fede053..c54c912 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -123,7 +123,7 @@ public class SSTableReaderTest extends CleanupHelper for (int j = 0; j < 100; j += 2) { DecoratedKey dk = Util.dk(String.valueOf(j)); - FileDataInput file = sstable.getFileDataInput(dk, DatabaseDescriptor.getIndexedReadBufferSizeInKB() * 1024); + FileDataInput file = sstable.getFileDataInput(sstable.getPosition(dk, SSTableReader.Operator.EQ).position); DecoratedKey keyInDisk = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(file)); @@ -134,7 +134,7 @@ public class SSTableReaderTest extends CleanupHelper for (int j = 1; j < 110; j += 2) { DecoratedKey dk = Util.dk(String.valueOf(j)); - assert sstable.getPosition(dk, SSTableReader.Operator.EQ) == -1; + assert sstable.getPosition(dk, SSTableReader.Operator.EQ) == null; } } @@ -184,10 +184,10 @@ public class SSTableReaderTest extends CleanupHelper CompactionManager.instance.performMaximal(store); SSTableReader sstable = store.getSSTables().iterator().next(); - long p2 = sstable.getPosition(k(2), SSTableReader.Operator.EQ); - long p3 = sstable.getPosition(k(3), SSTableReader.Operator.EQ); - long p6 = sstable.getPosition(k(6), SSTableReader.Operator.EQ); - long p7 = sstable.getPosition(k(7), SSTableReader.Operator.EQ); + long p2 = sstable.getPosition(k(2), SSTableReader.Operator.EQ).position; + long p3 = sstable.getPosition(k(3), SSTableReader.Operator.EQ).position; + long p6 = sstable.getPosition(k(6), SSTableReader.Operator.EQ).position; + long p7 = sstable.getPosition(k(7), SSTableReader.Operator.EQ).position; Pair<Long, Long> p = sstable.getPositionsForRanges(makeRanges(t(2), t(6))).iterator().next(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java index 34f3226..12f2747 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java @@ -25,24 +25,30 @@ import java.util.*; import org.junit.Test; -import org.apache.cassandra.CleanupHelper; +import org.apache.cassandra.db.*; +import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.Util; -public class SSTableTest extends CleanupHelper +public class SSTableTest extends SchemaLoader { @Test - public void testSingleWrite() throws IOException { + public void testSingleWrite() throws IOException + { // write test data ByteBuffer key = ByteBufferUtil.bytes(Integer.toString(1)); - ByteBuffer bytes = ByteBuffer.wrap(new byte[1024]); - new Random().nextBytes(bytes.array()); + ByteBuffer cbytes = ByteBuffer.wrap(new byte[1024]); + new Random().nextBytes(cbytes.array()); + ColumnFamily cf = ColumnFamily.create("Keyspace1", "Standard1"); + cf.addColumn(null, new Column(cbytes, cbytes)); - Map<ByteBuffer, ByteBuffer> map = new HashMap<ByteBuffer,ByteBuffer>(); - map.put(key, bytes); - SSTableReader ssTable = SSTableUtils.prepare().cf("Standard1").writeRaw(map); + SortedMap<DecoratedKey, ColumnFamily> map = new TreeMap<DecoratedKey, ColumnFamily>(); + map.put(Util.dk(key), cf); + SSTableReader ssTable = SSTableUtils.prepare().cf("Standard1").write(map); // verify + ByteBuffer bytes = Util.serializeForSSTable(cf); verifySingle(ssTable, bytes, key); ssTable = SSTableReader.open(ssTable.descriptor); // read the index from disk verifySingle(ssTable, bytes, key); @@ -51,7 +57,7 @@ public class SSTableTest extends CleanupHelper private void verifySingle(SSTableReader sstable, ByteBuffer bytes, ByteBuffer key) throws IOException { RandomAccessReader file = sstable.openDataReader(false); - file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ)); + file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ).position); assert key.equals(ByteBufferUtil.readWithShortLength(file)); int size = (int)SSTableReader.readRowSize(file, sstable.descriptor); byte[] bytes2 = new byte[size]; @@ -60,20 +66,27 @@ public class SSTableTest extends CleanupHelper } @Test - public void testManyWrites() throws IOException { - Map<ByteBuffer, ByteBuffer> map = new HashMap<ByteBuffer,ByteBuffer>(); - for (int i = 100; i < 1000; ++i) + public void testManyWrites() throws IOException + { + SortedMap<DecoratedKey, ColumnFamily> map = new TreeMap<DecoratedKey, ColumnFamily>(); + SortedMap<ByteBuffer, ByteBuffer> bytesMap = new TreeMap<ByteBuffer, ByteBuffer>(); + //for (int i = 100; i < 1000; ++i) + for (int i = 100; i < 300; ++i) { - map.put(ByteBufferUtil.bytes(Integer.toString(i)), ByteBufferUtil.bytes(("Avinash Lakshman is a good man: " + i))); + ColumnFamily cf = ColumnFamily.create("Keyspace1", "Standard2"); + ByteBuffer bytes = ByteBufferUtil.bytes(("Avinash Lakshman is a good man: " + i)); + cf.addColumn(null, new Column(bytes, bytes)); + map.put(Util.dk(Integer.toString(i)), cf); + bytesMap.put(ByteBufferUtil.bytes(Integer.toString(i)), Util.serializeForSSTable(cf)); } // write - SSTableReader ssTable = SSTableUtils.prepare().cf("Standard2").writeRaw(map); + SSTableReader ssTable = SSTableUtils.prepare().cf("Standard2").write(map); // verify - verifyMany(ssTable, map); + verifyMany(ssTable, bytesMap); ssTable = SSTableReader.open(ssTable.descriptor); // read the index from disk - verifyMany(ssTable, map); + verifyMany(ssTable, bytesMap); Set<Component> live = SSTable.componentsFor(ssTable.descriptor); assert !live.isEmpty() : "SSTable has no live components"; @@ -84,11 +97,11 @@ public class SSTableTest extends CleanupHelper private void verifyMany(SSTableReader sstable, Map<ByteBuffer, ByteBuffer> map) throws IOException { List<ByteBuffer> keys = new ArrayList<ByteBuffer>(map.keySet()); - Collections.shuffle(keys); + //Collections.shuffle(keys); RandomAccessReader file = sstable.openDataReader(false); for (ByteBuffer key : keys) { - file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ)); + file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ).position); assert key.equals( ByteBufferUtil.readWithShortLength(file)); int size = (int)SSTableReader.readRowSize(file, sstable.descriptor); byte[] bytes2 = new byte[size]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java index 038558f..72239dd 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java @@ -179,12 +179,8 @@ public class SSTableUtils return write(map); } - public SSTableReader write(Map<String, ColumnFamily> entries) throws IOException + public SSTableReader write(SortedMap<DecoratedKey, ColumnFamily> sorted) throws IOException { - SortedMap<DecoratedKey, ColumnFamily> sorted = new TreeMap<DecoratedKey, ColumnFamily>(); - for (Map.Entry<String, ColumnFamily> entry : entries.entrySet()) - sorted.put(Util.dk(entry.getKey()), entry.getValue()); - final Iterator<Map.Entry<DecoratedKey, ColumnFamily>> iter = sorted.entrySet().iterator(); return write(sorted.size(), new Appender() { @@ -200,30 +196,13 @@ public class SSTableUtils }); } - /** - * @Deprecated: Writes the binary content of a row, which should be encapsulated. - */ - @Deprecated - public SSTableReader writeRaw(Map<ByteBuffer, ByteBuffer> entries) throws IOException + public SSTableReader write(Map<String, ColumnFamily> entries) throws IOException { - File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, generation) : new File(dest.filenameFor(Component.DATA)); - SSTableWriter writer = new SSTableWriter(datafile.getAbsolutePath(), entries.size()); - SortedMap<DecoratedKey, ByteBuffer> sorted = new TreeMap<DecoratedKey, ByteBuffer>(); - for (Map.Entry<ByteBuffer, ByteBuffer> entry : entries.entrySet()) - sorted.put(writer.partitioner.decorateKey(entry.getKey()), entry.getValue()); - final Iterator<Map.Entry<DecoratedKey, ByteBuffer>> iter = sorted.entrySet().iterator(); - return write(sorted.size(), new Appender() - { - @Override - public boolean append(SSTableWriter writer) throws IOException - { - if (!iter.hasNext()) - return false; - Map.Entry<DecoratedKey, ByteBuffer> entry = iter.next(); - writer.append(entry.getKey(), entry.getValue()); - return true; - } - }); + SortedMap<DecoratedKey, ColumnFamily> sorted = new TreeMap<DecoratedKey, ColumnFamily>(); + for (Map.Entry<String, ColumnFamily> entry : entries.entrySet()) + sorted.put(Util.dk(entry.getKey()), entry.getValue()); + + return write(sorted); } public SSTableReader write(int expectedSize, Appender appender) throws IOException