http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
index d69eb16..22cce77 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Iterator;
 
 import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.utils.Pair;
@@ -44,7 +45,7 @@ public class SSTableBoundedScanner extends SSTableScanner
             currentRange = rangeIterator.next();
             try
             {
-                file.seek(currentRange.left);
+                dfile.seek(currentRange.left);
             }
             catch (IOException e)
             {
@@ -58,6 +59,16 @@ public class SSTableBoundedScanner extends SSTableScanner
         }
     }
 
+    /*
+     * This shouldn't be used with a bounded scanner as it could put the
+     * bounded scanner outside it's range.
+     */
+    @Override
+    public void seekTo(RowPosition seekKey)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public boolean hasNext()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 695e966..f5895a2 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -110,7 +110,7 @@ public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterat
                 if (dataStart + dataSize > file.length())
                     throw new IOException(String.format("dataSize of %s 
starting at %s would be larger than file %s length %s",
                                           dataSize, dataStart, file.getPath(), 
file.length()));
-                if (checkData)
+                if (checkData && !sstable.descriptor.hasPromotedIndexes)
                 {
                     try
                     {
@@ -137,8 +137,11 @@ public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterat
                 }
             }
 
-            IndexHelper.skipBloomFilter(inputWithTracker);
-            IndexHelper.skipIndex(inputWithTracker);
+            if (!sstable.descriptor.hasPromotedIndexes)
+            {
+                IndexHelper.skipBloomFilter(inputWithTracker);
+                IndexHelper.skipIndex(inputWithTracker);
+            }
             columnFamily = ColumnFamily.create(metadata);
             
ColumnFamily.serializer().deserializeFromSSTableNoColumns(columnFamily, 
inputWithTracker);
             columnCount = inputWithTracker.readInt();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 072b6f7..a41d2a7 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -82,7 +82,7 @@ public class SSTableReader extends SSTable
     private IndexSummary indexSummary;
     private Filter bf;
 
-    private InstrumentingCache<KeyCacheKey, Long> keyCache;
+    private InstrumentingCache<KeyCacheKey, RowIndexEntry> keyCache;
 
     private final BloomFilterTracker bloomFilterTracker = new 
BloomFilterTracker();
 
@@ -353,7 +353,7 @@ public class SSTableReader extends SSTable
             long histogramCount = sstableMetadata.estimatedRowSize.count();
             long estimatedKeys = histogramCount > 0 && 
!sstableMetadata.estimatedRowSize.isOverflowed()
                                ? histogramCount
-                               : SSTable.estimateRowsFromIndex(input); // 
statistics is supposed to be optional
+                               : estimateRowsFromIndex(input); // statistics 
is supposed to be optional
             indexSummary = new IndexSummary(estimatedKeys);
             if (recreatebloom)
                 bf = LegacyBloomFilter.getFilter(estimatedKeys, 15);
@@ -377,7 +377,7 @@ public class SSTableReader extends SSTable
                     left = decodeKey(partitioner, descriptor, skippedKey);
                 right = decodeKey(partitioner, descriptor, skippedKey);
 
-                long dataPosition = input.readLong();
+                RowIndexEntry indexEntry = 
RowIndexEntry.serializer.deserialize(input, descriptor);
                 if (key != null)
                 {
                     DecoratedKey decoratedKey = decodeKey(partitioner, 
descriptor, key);
@@ -387,12 +387,12 @@ public class SSTableReader extends SSTable
                         indexSummary.addEntry(decoratedKey, indexPosition);
                     // if key cache could be used and we have key already 
pre-loaded
                     if (cacheLoading && 
keysToLoadInCache.contains(decoratedKey))
-                        cacheKey(decoratedKey, dataPosition);
+                        cacheKey(decoratedKey, indexEntry);
                 }
 
                 indexSummary.incrementRowid();
                 ibuilder.addPotentialBoundary(indexPosition);
-                dbuilder.addPotentialBoundary(dataPosition);
+                dbuilder.addPotentialBoundary(indexEntry.position);
             }
             indexSummary.complete();
         }
@@ -409,7 +409,7 @@ public class SSTableReader extends SSTable
     }
 
     /** get the position in the index file to start scanning to find the given 
key (at most indexInterval keys away) */
-    private long getIndexScanPosition(RowPosition key)
+    public long getIndexScanPosition(RowPosition key)
     {
         assert indexSummary.getKeys() != null && indexSummary.getKeys().size() 
> 0;
         int index = Collections.binarySearch(indexSummary.getKeys(), key);
@@ -597,11 +597,13 @@ public class SSTableReader extends SSTable
         for (Range<Token> range : Range.normalize(ranges))
         {
             AbstractBounds<RowPosition> keyRange = range.toRowBounds();
-            long left = getPosition(keyRange.left, Operator.GT);
+            RowIndexEntry idxLeft = getPosition(keyRange.left, Operator.GT);
+            long left = idxLeft == null ? -1 : idxLeft.position;
             if (left == -1)
                 // left is past the end of the file
                 continue;
-            long right = getPosition(keyRange.right, Operator.GT);
+            RowIndexEntry idxRight = getPosition(keyRange.right, Operator.GT);
+            long right = idxRight == null ? -1 : idxRight.position;
             if (right == -1 || Range.isWrapAround(range.left, range.right))
                 // right is past the end of the file, or it wraps
                 right = uncompressedLength();
@@ -613,7 +615,7 @@ public class SSTableReader extends SSTable
         return positions;
     }
 
-    public void cacheKey(DecoratedKey key, Long info)
+    public void cacheKey(DecoratedKey key, RowIndexEntry info)
     {
         CFMetaData.Caching caching = metadata.getCaching();
 
@@ -627,12 +629,12 @@ public class SSTableReader extends SSTable
         keyCache.put(new KeyCacheKey(descriptor, 
ByteBufferUtil.clone(key.key)), info);
     }
 
-    public Long getCachedPosition(DecoratedKey key, boolean updateStats)
+    public RowIndexEntry getCachedPosition(DecoratedKey key, boolean 
updateStats)
     {
         return getCachedPosition(new KeyCacheKey(descriptor, key.key), 
updateStats);
     }
 
-    private Long getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats)
+    private RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean 
updateStats)
     {
         if (keyCache != null && keyCache.getCapacity() > 0)
             return updateStats ? keyCache.get(unifiedKey) : 
keyCache.getInternal(unifiedKey);
@@ -643,23 +645,23 @@ public class SSTableReader extends SSTable
      * @param key The key to apply as the rhs to the given Operator. A 'fake' 
key is allowed to
      * allow key selection by token bounds but only if op != * EQ
      * @param op The Operator defining matching keys: the nearest key to the 
target matching the operator wins.
-     * @return The position in the data file to find the key, or -1 if the key 
is not present
+     * @return The index entry corresponding to the key, or null if the key is 
not present
      */
-    public long getPosition(RowPosition key, Operator op)
+    public RowIndexEntry getPosition(RowPosition key, Operator op)
     {
         // first, check bloom filter
         if (op == Operator.EQ)
         {
             assert key instanceof DecoratedKey; // EQ only make sense if the 
key is a valid row key
             if (!bf.isPresent(((DecoratedKey)key).key))
-                return -1;
+                return null;
         }
 
         // next, the key cache (only make sense for valid row key)
         if ((op == Operator.EQ || op == Operator.GE) && (key instanceof 
DecoratedKey))
         {
             DecoratedKey decoratedKey = (DecoratedKey)key;
-            Long cachedPosition = getCachedPosition(new 
KeyCacheKey(descriptor, decoratedKey.key), true);
+            RowIndexEntry cachedPosition = getCachedPosition(new 
KeyCacheKey(descriptor, decoratedKey.key), true);
             if (cachedPosition != null)
                 return cachedPosition;
         }
@@ -670,8 +672,12 @@ public class SSTableReader extends SSTable
         {
             if (op == Operator.EQ)
                 bloomFilterTracker.addFalsePositive();
-            // we matched the -1th position: if the operator might match 
forward, return the 0th position
-            return op.apply(1) >= 0 ? 0 : -1;
+            // we matched the -1th position: if the operator might match 
forward, we'll start at the first
+            // position. We however need to return the correct index entry for 
that first position.
+            if (op.apply(1) >= 0)
+                sampledPosition = 0;
+            else
+                return null;
         }
 
         // scan the on-disk index, starting at the nearest sampled position
@@ -685,29 +691,29 @@ public class SSTableReader extends SSTable
                 {
                     // read key & data position from index entry
                     DecoratedKey indexDecoratedKey = decodeKey(partitioner, 
descriptor, ByteBufferUtil.readWithShortLength(input));
-                    long dataPosition = input.readLong();
-
                     int comparison = indexDecoratedKey.compareTo(key);
                     int v = op.apply(comparison);
                     if (v == 0)
                     {
+                        RowIndexEntry indexEntry = 
RowIndexEntry.serializer.deserialize(input, descriptor);
                         if (comparison == 0 && keyCache != null && 
keyCache.getCapacity() > 0)
                         {
                             assert key instanceof DecoratedKey; // key can be 
== to the index key only if it's a true row key
                             DecoratedKey decoratedKey = (DecoratedKey)key;
                             // store exact match for the key
-                            cacheKey(decoratedKey, dataPosition);
+                            cacheKey(decoratedKey, indexEntry);
                         }
                         if (op == Operator.EQ)
                             bloomFilterTracker.addTruePositive();
-                        return dataPosition;
+                        return indexEntry;
                     }
                     if (v < 0)
                     {
                         if (op == Operator.EQ)
                             bloomFilterTracker.addFalsePositive();
-                        return -1;
+                        return null;
                     }
+                    RowIndexEntry.serializer.skip(input, descriptor);
                 }
             }
             catch (IOException e)
@@ -723,7 +729,7 @@ public class SSTableReader extends SSTable
 
         if (op == Operator.EQ)
             bloomFilterTracker.addFalsePositive();
-        return -1;
+        return null;
     }
 
     /**
@@ -842,12 +848,8 @@ public class SSTableReader extends SSTable
         return new SSTableBoundedScanner(this, true, range);
     }
 
-    public FileDataInput getFileDataInput(DecoratedKey decoratedKey, int 
bufferSize)
+    public FileDataInput getFileDataInput(long position)
     {
-        long position = getPosition(decoratedKey, Operator.EQ);
-        if (position < 0)
-            return null;
-
         return dfile.getSegment(position);
     }
 
@@ -889,6 +891,11 @@ public class SSTableReader extends SSTable
         return p.decorateKey(bytes);
     }
 
+    public DecoratedKey decodeKey(ByteBuffer bytes)
+    {
+        return decodeKey(partitioner, descriptor, bytes);
+    }
+
     /**
      * TODO: Move someplace reusable
      */
@@ -940,7 +947,7 @@ public class SSTableReader extends SSTable
         return bloomFilterTracker.getRecentTruePositiveCount();
     }
 
-    public InstrumentingCache<KeyCacheKey, Long> getKeyCache()
+    public InstrumentingCache<KeyCacheKey, RowIndexEntry> getKeyCache()
     {
         return keyCache;
     }
@@ -982,6 +989,11 @@ public class SSTableReader extends SSTable
                : RandomAccessReader.open(new File(getFilename()), skipIOCache);
     }
 
+    public RandomAccessReader openIndexReader(boolean skipIOCache) throws 
IOException
+    {
+        return RandomAccessReader.open(new File(getIndexFilename()), 
skipIOCache);
+    }
+
     /**
      * @param sstables
      * @return true if all desired references were acquired.  Otherwise, it 
will unreference any partial acquisition, and return false.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 8f03cc1..902f926 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -26,10 +26,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CloseableIterator;
 
@@ -37,12 +39,13 @@ public class SSTableScanner implements 
CloseableIterator<IColumnIterator>
 {
     private static final Logger logger = 
LoggerFactory.getLogger(SSTableScanner.class);
 
-    protected final RandomAccessReader file;
+    protected final RandomAccessReader dfile;
+    protected final RandomAccessReader ifile;
     public final SSTableReader sstable;
     private IColumnIterator row;
     protected boolean exhausted = false;
     protected Iterator<IColumnIterator> iterator;
-    private QueryFilter filter;
+    private final QueryFilter filter;
 
     /**
      * @param sstable SSTable to scan.
@@ -51,7 +54,8 @@ public class SSTableScanner implements 
CloseableIterator<IColumnIterator>
     {
         try
         {
-            this.file = sstable.openDataReader(skipCache);
+            this.dfile = sstable.openDataReader(skipCache);
+            this.ifile = sstable.openIndexReader(skipCache);
         }
         catch (IOException e)
         {
@@ -59,6 +63,7 @@ public class SSTableScanner implements 
CloseableIterator<IColumnIterator>
             throw new IOError(e);
         }
         this.sstable = sstable;
+        this.filter = null;
     }
 
     /**
@@ -69,7 +74,8 @@ public class SSTableScanner implements 
CloseableIterator<IColumnIterator>
     {
         try
         {
-            this.file = sstable.openDataReader(false);
+            this.dfile = sstable.openDataReader(false);
+            this.ifile = sstable.openIndexReader(false);
         }
         catch (IOException e)
         {
@@ -82,21 +88,40 @@ public class SSTableScanner implements 
CloseableIterator<IColumnIterator>
 
     public void close() throws IOException
     {
-        file.close();
+        FileUtils.close(dfile, ifile);
     }
 
     public void seekTo(RowPosition seekKey)
     {
         try
         {
-            long position = sstable.getPosition(seekKey, 
SSTableReader.Operator.GE);
-            if (position < 0)
+            long indexPosition = sstable.getIndexScanPosition(seekKey);
+            // -1 means the key is before everything in the sstable. So just 
start from the beginning.
+            if (indexPosition == -1)
+                indexPosition = 0;
+
+            ifile.seek(indexPosition);
+
+            while (!ifile.isEOF())
             {
-                exhausted = true;
-                return;
+                long startPosition = ifile.getFilePointer();
+                DecoratedKey indexDecoratedKey = 
sstable.decodeKey(ByteBufferUtil.readWithShortLength(ifile));
+                int comparison = indexDecoratedKey.compareTo(seekKey);
+                if (comparison >= 0)
+                {
+                    // Found, just read the dataPosition and seek into index 
and data files
+                    long dataPosition = ifile.readLong();
+                    ifile.seek(startPosition);
+                    dfile.seek(dataPosition);
+                    row = null;
+                    return;
+                }
+                else
+                {
+                    RowIndexEntry.serializer.skip(ifile, sstable.descriptor);
+                }
             }
-            file.seek(position);
-            row = null;
+            exhausted = true;
         }
         catch (IOException e)
         {
@@ -109,7 +134,7 @@ public class SSTableScanner implements 
CloseableIterator<IColumnIterator>
     {
         try
         {
-            return file.length();
+            return dfile.length();
         }
         catch (IOException e)
         {
@@ -119,20 +144,20 @@ public class SSTableScanner implements 
CloseableIterator<IColumnIterator>
 
     public long getFilePointer()
     {
-        return file.getFilePointer();
+        return dfile.getFilePointer();
     }
 
     public boolean hasNext()
     {
         if (iterator == null)
-            iterator = exhausted ? Arrays.asList(new 
IColumnIterator[0]).iterator() : new KeyScanningIterator();
+            iterator = exhausted ? Arrays.asList(new 
IColumnIterator[0]).iterator() : createIterator();
         return iterator.hasNext();
     }
 
     public IColumnIterator next()
     {
         if (iterator == null)
-            iterator = exhausted ? Arrays.asList(new 
IColumnIterator[0]).iterator() : new KeyScanningIterator();
+            iterator = exhausted ? Arrays.asList(new 
IColumnIterator[0]).iterator() : createIterator();
         return iterator.next();
     }
 
@@ -141,6 +166,11 @@ public class SSTableScanner implements 
CloseableIterator<IColumnIterator>
         throw new UnsupportedOperationException();
     }
 
+    private Iterator<IColumnIterator> createIterator()
+    {
+        return filter == null ? new KeyScanningIterator() : new 
FilteredKeyScanningIterator();
+    }
+
     protected class KeyScanningIterator implements Iterator<IColumnIterator>
     {
         protected long finishedAt;
@@ -150,8 +180,8 @@ public class SSTableScanner implements 
CloseableIterator<IColumnIterator>
             try
             {
                 if (row == null)
-                    return !file.isEOF();
-                return finishedAt < file.length();
+                    return !dfile.isEOF();
+                return finishedAt < dfile.length();
             }
             catch (IOException e)
             {
@@ -165,25 +195,88 @@ public class SSTableScanner implements 
CloseableIterator<IColumnIterator>
             try
             {
                 if (row != null)
-                    file.seek(finishedAt);
-                assert !file.isEOF();
-
-                DecoratedKey key = SSTableReader.decodeKey(sstable.partitioner,
-                                                           sstable.descriptor,
-                                                           
ByteBufferUtil.readWithShortLength(file));
-                long dataSize = SSTableReader.readRowSize(file, 
sstable.descriptor);
-                long dataStart = file.getFilePointer();
+                    dfile.seek(finishedAt);
+                assert !dfile.isEOF();
+
+                // Read data header
+                DecoratedKey key = 
sstable.decodeKey(ByteBufferUtil.readWithShortLength(dfile));
+                long dataSize = SSTableReader.readRowSize(dfile, 
sstable.descriptor);
+                long dataStart = dfile.getFilePointer();
                 finishedAt = dataStart + dataSize;
 
-                if (filter == null)
+                row = new SSTableIdentityIterator(sstable, dfile, key, 
dataStart, dataSize);
+                return row;
+            }
+            catch (IOException e)
+            {
+                sstable.markSuspect();
+                throw new RuntimeException(SSTableScanner.this + " failed to 
provide next columns from " + this, e);
+            }
+        }
+
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String toString()
+        {
+            return getClass().getSimpleName() + "(" + "finishedAt:" + 
finishedAt + ")";
+        }
+    }
+
+    protected class FilteredKeyScanningIterator implements 
Iterator<IColumnIterator>
+    {
+        protected DecoratedKey nextKey;
+        protected RowIndexEntry nextEntry;
+
+        public boolean hasNext()
+        {
+            try
+            {
+                if (row == null)
+                    return !ifile.isEOF();
+                return nextKey != null;
+            }
+            catch (IOException e)
+            {
+                sstable.markSuspect();
+                throw new RuntimeException(e);
+            }
+        }
+
+        public IColumnIterator next()
+        {
+            try
+            {
+                DecoratedKey currentKey;
+                RowIndexEntry currentEntry;
+
+                if (row == null)
                 {
-                    row = new SSTableIdentityIterator(sstable, file, key, 
dataStart, dataSize);
-                    return row;
+                    currentKey = 
sstable.decodeKey(ByteBufferUtil.readWithShortLength(ifile));
+                    currentEntry = RowIndexEntry.serializer.deserialize(ifile, 
sstable.descriptor);
                 }
                 else
                 {
-                    return row = filter.getSSTableColumnIterator(sstable, 
file, key);
+                    currentKey = nextKey;
+                    currentEntry = nextEntry;
                 }
+
+                if (ifile.isEOF())
+                {
+                    nextKey = null;
+                    nextEntry = null;
+                }
+                else
+                {
+                    nextKey = 
sstable.decodeKey(ByteBufferUtil.readWithShortLength(ifile));
+                    nextEntry = RowIndexEntry.serializer.deserialize(ifile, 
sstable.descriptor);
+                }
+
+                assert !dfile.isEOF();
+                return row = filter.getSSTableColumnIterator(sstable, dfile, 
currentKey, currentEntry);
             }
             catch (IOException e)
             {
@@ -196,19 +289,14 @@ public class SSTableScanner implements 
CloseableIterator<IColumnIterator>
         {
             throw new UnsupportedOperationException();
         }
-
-        @Override
-        public String toString() {
-            return getClass().getSimpleName() + "(" +
-                   "finishedAt:" + finishedAt +
-                   ")";
     }
-}
 
     @Override
-    public String toString() {
+    public String toString()
+    {
         return getClass().getSimpleName() + "(" +
-               "file=" + file +
+               "dfile=" + dfile +
+               " ifile=" + ifile +
                " sstable=" + sstable +
                " exhausted=" + exhausted +
                ")";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index e07e151..9225b9e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -137,7 +137,7 @@ public class SSTableWriter extends SSTable
         return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
     }
 
-    private void afterAppend(DecoratedKey decoratedKey, long dataPosition) 
throws IOException
+    private RowIndexEntry afterAppend(DecoratedKey decoratedKey, long 
dataPosition, DeletionInfo delInfo, ColumnIndex index) throws IOException
     {
         lastWrittenKey = decoratedKey;
         this.last = lastWrittenKey;
@@ -146,11 +146,13 @@ public class SSTableWriter extends SSTable
 
         if (logger.isTraceEnabled())
             logger.trace("wrote " + decoratedKey + " at " + dataPosition);
-        iwriter.afterAppend(decoratedKey, dataPosition);
+        RowIndexEntry entry = RowIndexEntry.create(dataPosition, delInfo, 
index);
+        iwriter.append(decoratedKey, entry);
         dbuilder.addPotentialBoundary(dataPosition);
+        return entry;
     }
 
-    public long append(AbstractCompactedRow row) throws IOException
+    public RowIndexEntry append(AbstractCompactedRow row) throws IOException
     {
         long currentPosition = beforeAppend(row.key);
         ByteBufferUtil.writeWithShortLength(row.key.key, dataFile.stream);
@@ -158,10 +160,8 @@ public class SSTableWriter extends SSTable
         long dataSize = row.write(dataFile.stream);
         assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
                 : "incorrect row data size " + dataSize + " written to " + 
dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart 
+ 8));
-
         sstableMetadataCollector.update(dataFile.getFilePointer() - 
currentPosition, row.columnStats());
-        afterAppend(row.key, currentPosition);
-        return currentPosition;
+        return afterAppend(row.key, currentPosition, row.deletionInfo(), 
row.index());
     }
 
     public void append(DecoratedKey decoratedKey, ColumnFamily cf) throws 
IOException
@@ -169,29 +169,19 @@ public class SSTableWriter extends SSTable
         long startPosition = beforeAppend(decoratedKey);
         ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile.stream);
 
-        // serialize index and bloom filter into in-memory structure
-        ColumnIndexer.RowHeader header = ColumnIndexer.serialize(cf);
+        // build column index
+        // TODO: build and serialization could be merged
+        ColumnIndex index = new ColumnIndex.Builder(cf.getComparator(), 
decoratedKey.key, cf.getColumnCount()).build(cf);
 
-        // write out row size
-        dataFile.stream.writeLong(header.serializedSize() + 
cf.serializedSizeForSSTable());
+        // write out row size + data
+        dataFile.stream.writeLong(cf.serializedSizeForSSTable());
+        ColumnFamily.serializer().serializeForSSTable(cf, dataFile.stream);
 
-        // write out row header and data
-        ColumnFamily.serializer().serializeWithIndexes(cf, header, 
dataFile.stream);
-        afterAppend(decoratedKey, startPosition);
+        afterAppend(decoratedKey, startPosition, cf.deletionInfo(), index);
 
         sstableMetadataCollector.update(dataFile.getFilePointer() - 
startPosition, cf.getColumnStats());
     }
 
-    public void append(DecoratedKey decoratedKey, ByteBuffer value) throws 
IOException
-    {
-        long currentPosition = beforeAppend(decoratedKey);
-        ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile.stream);
-        assert value.remaining() > 0;
-        dataFile.stream.writeLong(value.remaining());
-        ByteBufferUtil.write(value, dataFile.stream);
-        afterAppend(decoratedKey, currentPosition);
-    }
-
     public long appendFromStream(DecoratedKey key, CFMetaData metadata, long 
dataSize, DataInput in) throws IOException
     {
         long currentPosition = beforeAppend(key);
@@ -201,21 +191,12 @@ public class SSTableWriter extends SSTable
         // write row size
         dataFile.stream.writeLong(dataSize);
 
-        // write BF
-        int bfSize = in.readInt();
-        dataFile.stream.writeInt(bfSize);
-        for (int i = 0; i < bfSize; i++)
-            dataFile.stream.writeByte(in.readByte());
-
-        // write index
-        int indexSize = in.readInt();
-        dataFile.stream.writeInt(indexSize);
-        for (int i = 0; i < indexSize; i++)
-            dataFile.stream.writeByte(in.readByte());
-
         // cf data
-        dataFile.stream.writeInt(in.readInt());
-        dataFile.stream.writeLong(in.readLong());
+        int lct = in.readInt();
+        long mfda = in.readLong();
+        DeletionInfo deletionInfo = new DeletionInfo(mfda, lct);
+        dataFile.stream.writeInt(lct);
+        dataFile.stream.writeLong(mfda);
 
         // column size
         int columnCount = in.readInt();
@@ -225,6 +206,7 @@ public class SSTableWriter extends SSTable
         long maxTimestamp = Long.MIN_VALUE;
         StreamingHistogram tombstones = new 
StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
         ColumnFamily cf = ColumnFamily.create(metadata, 
ArrayBackedSortedColumns.factory());
+        ColumnIndex.Builder columnIndexer = new 
ColumnIndex.Builder(cf.getComparator(), key.key, columnCount);
         for (int i = 0; i < columnCount; i++)
         {
             // deserialize column with PRESERVE_SIZE because we've written the 
dataSize based on the
@@ -254,6 +236,7 @@ public class SSTableWriter extends SSTable
             }
             maxTimestamp = Math.max(maxTimestamp, column.maxTimestamp());
             cf.getColumnSerializer().serialize(column, dataFile.stream);
+            columnIndexer.add(column);
         }
 
         assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
@@ -262,7 +245,7 @@ public class SSTableWriter extends SSTable
         sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - 
currentPosition);
         sstableMetadataCollector.addColumnCount(columnCount);
         sstableMetadataCollector.mergeTombstoneHistogram(tombstones);
-        afterAppend(key, currentPosition);
+        afterAppend(key, currentPosition, deletionInfo, columnIndexer.build());
         return currentPosition;
     }
 
@@ -401,14 +384,14 @@ public class SSTableWriter extends SSTable
                : BloomFilter.getFilter(keyCount, fpChance);
         }
 
-        public void afterAppend(DecoratedKey key, long dataPosition) throws 
IOException
+        public void append(DecoratedKey key, RowIndexEntry indexEntry) throws 
IOException
         {
             bf.add(key.key);
             long indexPosition = indexFile.getFilePointer();
             ByteBufferUtil.writeWithShortLength(key.key, indexFile.stream);
-            indexFile.stream.writeLong(dataPosition);
+            RowIndexEntry.serializer.serialize(indexEntry, indexFile.stream);
             if (logger.isTraceEnabled())
-                logger.trace("wrote index of " + key + " at " + indexPosition);
+                logger.trace("wrote index entry: " + indexEntry + " at " + 
indexPosition);
 
             summary.maybeAddEntry(key, indexPosition);
             builder.addPotentialBoundary(indexPosition);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/util/FileDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileDataInput.java 
b/src/java/org/apache/cassandra/io/util/FileDataInput.java
index 200dfa7..d94075c 100644
--- a/src/java/org/apache/cassandra/io/util/FileDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/FileDataInput.java
@@ -30,12 +30,16 @@ public interface FileDataInput extends DataInput, Closeable
 
     public long bytesRemaining() throws IOException;
 
+    public void seek(long pos) throws IOException;
+
     public FileMark mark();
 
     public void reset(FileMark mark) throws IOException;
 
     public long bytesPastMark(FileMark mark);
 
+    public long getFilePointer();
+
     /**
      * Read length bytes from current file position
      * @param length length of the bytes to read

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java 
b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 9b89e39..1687379 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.io.util;
 
 import java.io.*;
 import java.text.DecimalFormat;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 
@@ -98,6 +99,11 @@ public class FileUtils
         }
     }
 
+    public static void close(Closeable... cs) throws IOException
+    {
+        close(Arrays.asList(cs));
+    }
+
     public static void close(Iterable<? extends Closeable> cs) throws 
IOException
     {
         IOException e = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java 
b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
index b3e7a23..786d312 100644
--- a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
@@ -28,21 +28,24 @@ public class MappedFileDataInput extends AbstractDataInput 
implements FileDataIn
 {
     private final MappedByteBuffer buffer;
     private final String filename;
+    private final long segmentOffset;
     private int position;
 
-    public MappedFileDataInput(FileInputStream stream, String filename, int 
position) throws IOException
+    public MappedFileDataInput(FileInputStream stream, String filename, long 
segmentOffset, int position) throws IOException
     {
         FileChannel channel = stream.getChannel();
         buffer = channel.map(FileChannel.MapMode.READ_ONLY, position, 
channel.size());
         this.filename = filename;
+        this.segmentOffset = segmentOffset;
         this.position = position;
     }
 
-    public MappedFileDataInput(MappedByteBuffer buffer, String filename, int 
position)
+    public MappedFileDataInput(MappedByteBuffer buffer, String filename, long 
segmentOffset, int position)
     {
         assert buffer != null;
         this.buffer = buffer;
         this.filename = filename;
+        this.segmentOffset = segmentOffset;
         this.position = position;
     }
 
@@ -52,6 +55,22 @@ public class MappedFileDataInput extends AbstractDataInput 
implements FileDataIn
         position = pos;
     }
 
+    // Only use when we know the seek in within the mapped segment. Throws an
+    // IOException otherwise.
+    public void seek(long pos) throws IOException
+    {
+        long inSegmentPos = pos - segmentOffset;
+        if (inSegmentPos < 0 || inSegmentPos > buffer.capacity())
+            throw new IOException(String.format("Seek position %d is not 
within mmap segment (seg offs: %d, length: %d)", pos, segmentOffset, 
buffer.capacity()));
+
+        seekInternal((int) inSegmentPos);
+    }
+
+    public long getFilePointer()
+    {
+        return segmentOffset + (long)position;
+    }
+
     protected int getPosition()
     {
         return position;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java 
b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 03b361e..3803963 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -77,7 +77,7 @@ public class MmappedSegmentedFile extends SegmentedFile
         if (segment.right != null)
         {
             // segment is mmap'd
-            return new MappedFileDataInput(segment.right, path, (int) 
(position - segment.left));
+            return new MappedFileDataInput(segment.right, path, segment.left, 
(int) (position - segment.left));
         }
 
         // not mmap'd: open a braf covering the segment

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java 
b/src/java/org/apache/cassandra/service/CacheService.java
index 05fb51f..88fe1a0 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -29,6 +29,7 @@ import javax.management.ObjectName;
 import org.apache.cassandra.cache.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.utils.FBUtilities;
 
 import org.slf4j.Logger;
@@ -61,7 +62,7 @@ public class CacheService implements CacheServiceMBean
 
     public final static CacheService instance = new CacheService();
 
-    public final AutoSavingCache<KeyCacheKey, Long> keyCache;
+    public final AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache;
     public final AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache;
 
     private int rowCacheSavePeriod;
@@ -91,7 +92,7 @@ public class CacheService implements CacheServiceMBean
      * We can use Weighers.singleton() because Long can't be leaking memory
      * @return auto saving cache object
      */
-    private AutoSavingCache<KeyCacheKey, Long> initKeyCache()
+    private AutoSavingCache<KeyCacheKey, RowIndexEntry> initKeyCache()
     {
         logger.info("Initializing key cache with capacity of {} MBs.", 
DatabaseDescriptor.getKeyCacheSizeInMB());
 
@@ -99,8 +100,8 @@ public class CacheService implements CacheServiceMBean
 
         // as values are constant size we can use singleton weigher
         // where 48 = 40 bytes (average size of the key) + 8 bytes (size of 
value)
-        ICache<KeyCacheKey, Long> kc = 
ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity / 
AVERAGE_KEY_CACHE_ROW_SIZE);
-        AutoSavingCache<KeyCacheKey, Long> keyCache = new 
AutoSavingCache<KeyCacheKey, Long>(kc, CacheType.KEY_CACHE);
+        ICache<KeyCacheKey, RowIndexEntry> kc = 
ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity / 
AVERAGE_KEY_CACHE_ROW_SIZE);
+        AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = new 
AutoSavingCache<KeyCacheKey, RowIndexEntry>(kc, CacheType.KEY_CACHE);
 
         int keyCacheKeysToSave = DatabaseDescriptor.getKeyCacheKeysToSave();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java 
b/src/java/org/apache/cassandra/utils/StatusLogger.java
index b547bf2..8225863 100644
--- a/src/java/org/apache/cassandra/utils/StatusLogger.java
+++ b/src/java/org/apache/cassandra/utils/StatusLogger.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.CacheService;
@@ -86,7 +87,7 @@ public class StatusLogger
                                   "MessagingService", "n/a", pendingCommands + 
"," + pendingResponses));
 
         // Global key/row cache information
-        AutoSavingCache<KeyCacheKey, Long> keyCache = 
CacheService.instance.keyCache;
+        AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = 
CacheService.instance.keyCache;
         AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = 
CacheService.instance.rowCache;
 
         int keyCacheKeysToSave = DatabaseDescriptor.getKeyCacheKeysToSave();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java 
b/test/unit/org/apache/cassandra/Util.java
index 0a376b4..1f6a799 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -20,8 +20,7 @@ package org.apache.cassandra;
  *
  */
 
-import java.io.EOFException;
-import java.io.IOException;
+import java.io.*;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
@@ -56,6 +55,11 @@ public class Util
         return 
StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(key));
     }
 
+    public static DecoratedKey dk(ByteBuffer key)
+    {
+        return StorageService.getPartitioner().decorateKey(key);
+    }
+
     public static RowPosition rp(String key)
     {
         return rp(key, StorageService.getPartitioner());
@@ -256,4 +260,12 @@ public class Util
 
         assert thrown : exception.getName() + " not received";
     }
+
+    public static ByteBuffer serializeForSSTable(ColumnFamily cf)
+    {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        cf.serializer().serializeForSSTable(cf, dos);
+        return ByteBuffer.wrap(baos.toByteArray());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java 
b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index a47e2ea..01add3f 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -72,7 +72,7 @@ public class KeyCacheTest extends CleanupHelper
         assertEquals(100, CacheService.instance.keyCache.size());
 
         // really? our caches don't implement the map interface? (hence no 
.addAll)
-        Map<KeyCacheKey, Long> savedMap = new HashMap<KeyCacheKey, Long>();
+        Map<KeyCacheKey, RowIndexEntry> savedMap = new HashMap<KeyCacheKey, 
RowIndexEntry>();
         for (KeyCacheKey k : CacheService.instance.keyCache.getKeySet())
         {
             savedMap.put(k, CacheService.instance.keyCache.get(k));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/test/unit/org/apache/cassandra/db/TableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/TableTest.java 
b/test/unit/org/apache/cassandra/db/TableTest.java
index b5a3869..f3e090e 100644
--- a/test/unit/org/apache/cassandra/db/TableTest.java
+++ b/test/unit/org/apache/cassandra/db/TableTest.java
@@ -404,14 +404,8 @@ public class TableTest extends CleanupHelper
         }
         // verify that we do indeed have multiple index entries
         SSTableReader sstable = cfStore.getSSTables().iterator().next();
-        long position = sstable.getPosition(key, SSTableReader.Operator.EQ);
-        RandomAccessReader file = sstable.openDataReader(false);
-        file.seek(position);
-        assert ByteBufferUtil.readWithShortLength(file).equals(key.key);
-        SSTableReader.readRowSize(file, sstable.descriptor);
-        IndexHelper.skipBloomFilter(file);
-        ArrayList<IndexHelper.IndexInfo> indexes = 
IndexHelper.deserializeIndex(file);
-        assert indexes.size() > 2;
+        RowIndexEntry indexEntry = sstable.getPosition(key, 
SSTableReader.Operator.EQ);
+        assert indexEntry.columnsIndex().size() > 2;
 
         validateSliceLarge(cfStore);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java 
b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
index 1704248..ef2adfa 100644
--- a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
+++ b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
@@ -108,8 +108,8 @@ public class LazilyCompactedRowTest extends CleanupHelper
             new FileOutputStream(tmpFile1).write(out1.getData()); // writing 
data from row1
             new FileOutputStream(tmpFile2).write(out2.getData()); // writing 
data from row2
 
-            MappedFileDataInput in1 = new MappedFileDataInput(new 
FileInputStream(tmpFile1), tmpFile1.getAbsolutePath(), 0);
-            MappedFileDataInput in2 = new MappedFileDataInput(new 
FileInputStream(tmpFile2), tmpFile2.getAbsolutePath(), 0);
+            MappedFileDataInput in1 = new MappedFileDataInput(new 
FileInputStream(tmpFile1), tmpFile1.getAbsolutePath(), 0, 0);
+            MappedFileDataInput in2 = new MappedFileDataInput(new 
FileInputStream(tmpFile2), tmpFile2.getAbsolutePath(), 0, 0);
 
             // key isn't part of what CompactedRow writes, that's done by 
SSTW.append
 
@@ -118,18 +118,6 @@ public class LazilyCompactedRowTest extends CleanupHelper
             long rowSize2 = SSTableReader.readRowSize(in2, 
sstables.iterator().next().descriptor);
             assertEquals(rowSize1 + 8, out1.getLength());
             assertEquals(rowSize2 + 8, out2.getLength());
-            // bloom filter
-            IndexHelper.defreezeBloomFilter(in1, rowSize1, false);
-            IndexHelper.defreezeBloomFilter(in2, rowSize2, false);
-            // index
-            int indexSize1 = in1.readInt();
-            int indexSize2 = in2.readInt();
-            assertEquals(indexSize1, indexSize2);
-
-            ByteBuffer bytes1 = in1.readBytes(indexSize1);
-            ByteBuffer bytes2 = in2.readBytes(indexSize2);
-
-            assert bytes1.equals(bytes2);
 
             // cf metadata
             ColumnFamily cf1 = ColumnFamily.create(cfs.metadata);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index fede053..c54c912 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -123,7 +123,7 @@ public class SSTableReaderTest extends CleanupHelper
         for (int j = 0; j < 100; j += 2)
         {
             DecoratedKey dk = Util.dk(String.valueOf(j));
-            FileDataInput file = sstable.getFileDataInput(dk, 
DatabaseDescriptor.getIndexedReadBufferSizeInKB() * 1024);
+            FileDataInput file = 
sstable.getFileDataInput(sstable.getPosition(dk, 
SSTableReader.Operator.EQ).position);
             DecoratedKey keyInDisk = 
SSTableReader.decodeKey(sstable.partitioner,
                                                              
sstable.descriptor,
                                                              
ByteBufferUtil.readWithShortLength(file));
@@ -134,7 +134,7 @@ public class SSTableReaderTest extends CleanupHelper
         for (int j = 1; j < 110; j += 2)
         {
             DecoratedKey dk = Util.dk(String.valueOf(j));
-            assert sstable.getPosition(dk, SSTableReader.Operator.EQ) == -1;
+            assert sstable.getPosition(dk, SSTableReader.Operator.EQ) == null;
         }
     }
 
@@ -184,10 +184,10 @@ public class SSTableReaderTest extends CleanupHelper
         CompactionManager.instance.performMaximal(store);
 
         SSTableReader sstable = store.getSSTables().iterator().next();
-        long p2 = sstable.getPosition(k(2), SSTableReader.Operator.EQ);
-        long p3 = sstable.getPosition(k(3), SSTableReader.Operator.EQ);
-        long p6 = sstable.getPosition(k(6), SSTableReader.Operator.EQ);
-        long p7 = sstable.getPosition(k(7), SSTableReader.Operator.EQ);
+        long p2 = sstable.getPosition(k(2), 
SSTableReader.Operator.EQ).position;
+        long p3 = sstable.getPosition(k(3), 
SSTableReader.Operator.EQ).position;
+        long p6 = sstable.getPosition(k(6), 
SSTableReader.Operator.EQ).position;
+        long p7 = sstable.getPosition(k(7), 
SSTableReader.Operator.EQ).position;
 
         Pair<Long, Long> p = sstable.getPositionsForRanges(makeRanges(t(2), 
t(6))).iterator().next();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
index 34f3226..12f2747 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
@@ -25,24 +25,30 @@ import java.util.*;
 
 import org.junit.Test;
 
-import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.Util;
 
-public class SSTableTest extends CleanupHelper
+public class SSTableTest extends SchemaLoader
 {
     @Test
-    public void testSingleWrite() throws IOException {
+    public void testSingleWrite() throws IOException
+    {
         // write test data
         ByteBuffer key = ByteBufferUtil.bytes(Integer.toString(1));
-        ByteBuffer bytes = ByteBuffer.wrap(new byte[1024]);
-        new Random().nextBytes(bytes.array());
+        ByteBuffer cbytes = ByteBuffer.wrap(new byte[1024]);
+        new Random().nextBytes(cbytes.array());
+        ColumnFamily cf = ColumnFamily.create("Keyspace1", "Standard1");
+        cf.addColumn(null, new Column(cbytes, cbytes));
 
-        Map<ByteBuffer, ByteBuffer> map = new HashMap<ByteBuffer,ByteBuffer>();
-        map.put(key, bytes);
-        SSTableReader ssTable = 
SSTableUtils.prepare().cf("Standard1").writeRaw(map);
+        SortedMap<DecoratedKey, ColumnFamily> map = new TreeMap<DecoratedKey, 
ColumnFamily>();
+        map.put(Util.dk(key), cf);
+        SSTableReader ssTable = 
SSTableUtils.prepare().cf("Standard1").write(map);
 
         // verify
+        ByteBuffer bytes = Util.serializeForSSTable(cf);
         verifySingle(ssTable, bytes, key);
         ssTable = SSTableReader.open(ssTable.descriptor); // read the index 
from disk
         verifySingle(ssTable, bytes, key);
@@ -51,7 +57,7 @@ public class SSTableTest extends CleanupHelper
     private void verifySingle(SSTableReader sstable, ByteBuffer bytes, 
ByteBuffer key) throws IOException
     {
         RandomAccessReader file = sstable.openDataReader(false);
-        file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), 
SSTableReader.Operator.EQ));
+        file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), 
SSTableReader.Operator.EQ).position);
         assert key.equals(ByteBufferUtil.readWithShortLength(file));
         int size = (int)SSTableReader.readRowSize(file, sstable.descriptor);
         byte[] bytes2 = new byte[size];
@@ -60,20 +66,27 @@ public class SSTableTest extends CleanupHelper
     }
 
     @Test
-    public void testManyWrites() throws IOException {
-        Map<ByteBuffer, ByteBuffer> map = new HashMap<ByteBuffer,ByteBuffer>();
-        for (int i = 100; i < 1000; ++i)
+    public void testManyWrites() throws IOException
+    {
+        SortedMap<DecoratedKey, ColumnFamily> map = new TreeMap<DecoratedKey, 
ColumnFamily>();
+        SortedMap<ByteBuffer, ByteBuffer> bytesMap = new TreeMap<ByteBuffer, 
ByteBuffer>();
+        //for (int i = 100; i < 1000; ++i)
+        for (int i = 100; i < 300; ++i)
         {
-            map.put(ByteBufferUtil.bytes(Integer.toString(i)), 
ByteBufferUtil.bytes(("Avinash Lakshman is a good man: " + i)));
+            ColumnFamily cf = ColumnFamily.create("Keyspace1", "Standard2");
+            ByteBuffer bytes = ByteBufferUtil.bytes(("Avinash Lakshman is a 
good man: " + i));
+            cf.addColumn(null, new Column(bytes, bytes));
+            map.put(Util.dk(Integer.toString(i)), cf);
+            bytesMap.put(ByteBufferUtil.bytes(Integer.toString(i)), 
Util.serializeForSSTable(cf));
         }
 
         // write
-        SSTableReader ssTable = 
SSTableUtils.prepare().cf("Standard2").writeRaw(map);
+        SSTableReader ssTable = 
SSTableUtils.prepare().cf("Standard2").write(map);
 
         // verify
-        verifyMany(ssTable, map);
+        verifyMany(ssTable, bytesMap);
         ssTable = SSTableReader.open(ssTable.descriptor); // read the index 
from disk
-        verifyMany(ssTable, map);
+        verifyMany(ssTable, bytesMap);
 
         Set<Component> live = SSTable.componentsFor(ssTable.descriptor);
         assert !live.isEmpty() : "SSTable has no live components";
@@ -84,11 +97,11 @@ public class SSTableTest extends CleanupHelper
     private void verifyMany(SSTableReader sstable, Map<ByteBuffer, ByteBuffer> 
map) throws IOException
     {
         List<ByteBuffer> keys = new ArrayList<ByteBuffer>(map.keySet());
-        Collections.shuffle(keys);
+        //Collections.shuffle(keys);
         RandomAccessReader file = sstable.openDataReader(false);
         for (ByteBuffer key : keys)
         {
-            
file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), 
SSTableReader.Operator.EQ));
+            
file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), 
SSTableReader.Operator.EQ).position);
             assert key.equals( ByteBufferUtil.readWithShortLength(file));
             int size = (int)SSTableReader.readRowSize(file, 
sstable.descriptor);
             byte[] bytes2 = new byte[size];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 038558f..72239dd 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -179,12 +179,8 @@ public class SSTableUtils
             return write(map);
         }
 
-        public SSTableReader write(Map<String, ColumnFamily> entries) throws 
IOException
+        public SSTableReader write(SortedMap<DecoratedKey, ColumnFamily> 
sorted) throws IOException
         {
-            SortedMap<DecoratedKey, ColumnFamily> sorted = new 
TreeMap<DecoratedKey, ColumnFamily>();
-            for (Map.Entry<String, ColumnFamily> entry : entries.entrySet())
-                sorted.put(Util.dk(entry.getKey()), entry.getValue());
-
             final Iterator<Map.Entry<DecoratedKey, ColumnFamily>> iter = 
sorted.entrySet().iterator();
             return write(sorted.size(), new Appender()
             {
@@ -200,30 +196,13 @@ public class SSTableUtils
             });
         }
 
-        /**
-         * @Deprecated: Writes the binary content of a row, which should be 
encapsulated.
-         */
-        @Deprecated
-        public SSTableReader writeRaw(Map<ByteBuffer, ByteBuffer> entries) 
throws IOException
+        public SSTableReader write(Map<String, ColumnFamily> entries) throws 
IOException
         {
-            File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, 
generation) : new File(dest.filenameFor(Component.DATA));
-            SSTableWriter writer = new 
SSTableWriter(datafile.getAbsolutePath(), entries.size());
-            SortedMap<DecoratedKey, ByteBuffer> sorted = new 
TreeMap<DecoratedKey, ByteBuffer>();
-            for (Map.Entry<ByteBuffer, ByteBuffer> entry : entries.entrySet())
-                sorted.put(writer.partitioner.decorateKey(entry.getKey()), 
entry.getValue());
-            final Iterator<Map.Entry<DecoratedKey, ByteBuffer>> iter = 
sorted.entrySet().iterator();
-            return write(sorted.size(), new Appender()
-            {
-                @Override
-                public boolean append(SSTableWriter writer) throws IOException
-                {
-                    if (!iter.hasNext())
-                        return false;
-                    Map.Entry<DecoratedKey, ByteBuffer> entry = iter.next();
-                    writer.append(entry.getKey(), entry.getValue());
-                    return true;
-                }
-            });
+            SortedMap<DecoratedKey, ColumnFamily> sorted = new 
TreeMap<DecoratedKey, ColumnFamily>();
+            for (Map.Entry<String, ColumnFamily> entry : entries.entrySet())
+                sorted.put(Util.dk(entry.getKey()), entry.getValue());
+
+            return write(sorted);
         }
 
         public SSTableReader write(int expectedSize, Appender appender) throws 
IOException

Reply via email to