Author: jbellis
Date: Sun Aug 21 04:50:55 2011
New Revision: 1159942

URL: http://svn.apache.org/viewvc?rev=1159942&view=rev
Log:
Stop reading from sstables once we know we have the most recent columns
patch by Daniel Lundin and jbellis for CASSANDRA-2498

Added:
    cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
    
cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Sun Aug 21 04:50:55 2011
@@ -35,6 +35,9 @@
  * fix missing logging for some exceptions (CASSANDRA-2061)
  * refactor and optimize ColumnFamilyStore.files(...) and 
Descriptor.fromFilename(String)
    and few other places responsible for work with SSTable files 
(CASSANDRA-3040)
+ * Stop reading from sstables once we know we have the most recent columns,
+   for query-by-name requests (CASSANDRA-2498)
+
 
 0.8.5
  * fix NPE when encryption_options is unspecified (CASSANDRA-3007)

Added: cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java?rev=1159942&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java 
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java 
Sun Aug 21 04:50:55 2011
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.CloseableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterables;
+
+public class CollationController
+{
+    private static Logger logger = 
LoggerFactory.getLogger(CollationController.class);
+
+    private final DataTracker.View dataview;
+    private final ISortedColumns.Factory factory;
+    private final QueryFilter filter;
+    private final int gcBefore;
+    private final CFMetaData metadata;
+
+    private int sstablesIterated = 0;
+
+    public CollationController(DataTracker.View dataview, 
ISortedColumns.Factory factory, QueryFilter filter, CFMetaData metadata, int 
gcBefore)
+    {
+        this.dataview = dataview;
+        this.factory = factory;
+        this.filter = filter;
+        this.gcBefore = gcBefore;
+        this.metadata = metadata;
+    }
+
+    public ColumnFamily getTopLevelColumns()
+    {
+        return filter.filter instanceof NamesQueryFilter && 
metadata.getDefaultValidator() != CounterColumnType.instance
+               ? collectTimeOrderedData()
+               : collectAllData();
+    }
+
+    /**
+     * Collects data in order of recency, using the sstable maxtimestamp data.
+     * Once we have data for all requests columns that is newer than the 
newest remaining maxtimestamp,
+     * we stop.
+     */
+    private ColumnFamily collectTimeOrderedData()
+    {
+        logger.debug("collectTimeOrderedData");
+        List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
+        final ColumnFamily container = ColumnFamily.create(metadata, factory, 
filter.filter.isReversed());
+
+        try
+        {
+            for (Memtable memtable : 
Iterables.concat(dataview.memtablesPendingFlush, 
Collections.singleton(dataview.memtable)))
+            {
+                IColumnIterator iter = 
filter.getMemtableColumnIterator(memtable, metadata.comparator);
+                if (iter != null)
+                {
+                    iterators.add(iter);
+                    container.delete(iter.getColumnFamily());
+                    while (iter.hasNext())
+                        container.addColumn(iter.next());
+                }
+            }
+
+            // avoid changing the filter columns of the original filter
+            // (reduceNameFilter removes columns that are known to be 
irrelevant)
+            TreeSet<ByteBuffer> filterColumns = new 
TreeSet<ByteBuffer>(metadata.comparator);
+            filterColumns.addAll(((NamesQueryFilter) filter.filter).columns);
+            QueryFilter reducedFilter = new QueryFilter(filter.key, 
filter.path, new NamesQueryFilter(filterColumns));
+
+            // read sorted sstables
+            for (SSTableReader sstable : dataview.sstables)
+            {
+                long currentMaxTs = sstable.getMaxTimestamp();
+                reduceNameFilter(reducedFilter, container, currentMaxTs);
+                if (((NamesQueryFilter) 
reducedFilter.filter).columns.isEmpty())
+                    break;
+
+                IColumnIterator iter = 
reducedFilter.getSSTableColumnIterator(sstable);
+                iterators.add(iter);
+                if (iter.getColumnFamily() != null)
+                {
+                    container.delete(iter.getColumnFamily());
+                    sstablesIterated++;
+                    while (iter.hasNext())
+                        container.addColumn(iter.next());
+                }
+            }
+        }
+        finally
+        {
+            for (IColumnIterator iter : iterators)
+                FileUtils.closeQuietly(iter);
+        }
+
+        // we need to distinguish between "there is no data at all for this 
row" (BF will let us rebuild that efficiently)
+        // and "there used to be data, but it's gone now" (we should cache the 
empty CF so we don't need to rebuild that slower)
+        if (iterators.isEmpty())
+            return null;
+
+        // do a final collate.  toCollate is boilerplate required to provide a 
CloseableIterator
+        CloseableIterator<IColumn> toCollate = new 
SimpleAbstractColumnIterator()
+        {
+            final Iterator<IColumn> iter = container.iterator();
+
+            protected IColumn computeNext()
+            {
+                return iter.hasNext() ? iter.next() : endOfData();
+            }
+
+            public ColumnFamily getColumnFamily()
+            {
+                return container;
+            }
+
+            public DecoratedKey getKey()
+            {
+                return filter.key;
+            }
+        };
+        ColumnFamily returnCF = container.cloneMeShallow();
+        filter.collateColumns(returnCF, Collections.singletonList(toCollate), 
metadata.comparator, gcBefore);
+
+        // Caller is responsible for final removeDeletedCF.  This is important 
for cacheRow to work correctly:
+        return returnCF;
+    }
+
+    /**
+     * remove columns from @param filter where we already have data in @param 
returnCF newer than @param sstableTimestamp
+     */
+    private void reduceNameFilter(QueryFilter filter, ColumnFamily returnCF, 
long sstableTimestamp)
+    {
+        AbstractColumnContainer container = filter.path.superColumnName != null
+                                          ? (SuperColumn) 
returnCF.getColumn(filter.path.superColumnName)
+                                          : returnCF;
+        if (container == null)
+            return;
+
+        for (Iterator<ByteBuffer> iterator = ((NamesQueryFilter) 
filter.filter).columns.iterator(); iterator.hasNext(); )
+        {
+            ByteBuffer filterColumn = iterator.next();
+            IColumn column = container.getColumn(filterColumn);
+            if (column != null && column.minTimestamp() > sstableTimestamp)
+                iterator.remove();
+        }
+    }
+
+    /**
+     * Collects data the brute-force way: gets an iterator for the filter in 
question
+     * from every memtable and sstable, then merges them together.
+     */
+    private ColumnFamily collectAllData()
+    {
+        logger.debug("collectAllData");
+        List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
+        ColumnFamily returnCF = ColumnFamily.create(metadata, factory, 
filter.filter.isReversed());
+
+        try
+        {
+            for (Memtable memtable : 
Iterables.concat(dataview.memtablesPendingFlush, 
Collections.singleton(dataview.memtable)))
+            {
+                IColumnIterator iter = 
filter.getMemtableColumnIterator(memtable, metadata.comparator);
+                if (iter != null)
+                {
+                    returnCF.delete(iter.getColumnFamily());
+                    iterators.add(iter);
+                }
+            }
+
+            /* add the SSTables on disk */
+            for (SSTableReader sstable : dataview.sstables)
+            {
+                IColumnIterator iter = 
filter.getSSTableColumnIterator(sstable);
+                iterators.add(iter);
+                if (iter.getColumnFamily() != null)
+                {
+                    returnCF.delete(iter.getColumnFamily());
+                    sstablesIterated++;
+                }
+            }
+        }
+        finally
+        {
+            for (IColumnIterator iter : iterators)
+                FileUtils.closeQuietly(iter);
+        }
+
+        // we need to distinguish between "there is no data at all for this 
row" (BF will let us rebuild that efficiently)
+        // and "there used to be data, but it's gone now" (we should cache the 
empty CF so we don't need to rebuild that slower)
+        if (iterators.isEmpty())
+            return null;
+
+        filter.collateColumns(returnCF, iterators, metadata.comparator, 
gcBefore);
+
+        // Caller is responsible for final removeDeletedCF.  This is important 
for cacheRow to work correctly:
+        return returnCF;
+    }
+
+    public int getSstablesIterated()
+    {
+        return sstablesIterated;
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Sun Aug 21 
04:50:55 2011
@@ -104,6 +104,11 @@ public class Column implements IColumn
         return timestamp;
     }
 
+    public long minTimestamp()
+    {
+        return timestamp;
+    }
+
     public boolean isMarkedForDelete()
     {
         return false;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sun 
Aug 21 04:50:55 2011
@@ -1278,73 +1278,17 @@ public class ColumnFamilyStore implement
 
     private ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore, 
ISortedColumns.Factory factory)
     {
-        // we are querying top-level columns, do a merging fetch with indexes.
-        List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
-        final ColumnFamily returnCF = ColumnFamily.create(metadata, factory, 
filter.filter.isReversed());
         DataTracker.View currentView = markCurrentViewReferenced();
         try
         {
-            IColumnIterator iter;
-            int sstablesToIterate = 0;
-
-            /* add the current memtable */
-            iter = filter.getMemtableColumnIterator(currentView.memtable, 
getComparator());
-            if (iter != null)
-            {
-                returnCF.delete(iter.getColumnFamily());
-                iterators.add(iter);
-            }
-
-            /* add the memtables being flushed */
-            for (Memtable memtable : currentView.memtablesPendingFlush)
-            {
-                iter = filter.getMemtableColumnIterator(memtable, 
getComparator());
-                if (iter != null)
-                {
-                    returnCF.delete(iter.getColumnFamily());
-                    iterators.add(iter);
-                }
-            }
-
-            /* add the SSTables on disk */
-            for (SSTableReader sstable : currentView.sstables)
-            {
-                iter = filter.getSSTableColumnIterator(sstable);
-                if (iter.getColumnFamily() != null)
-                {
-                    returnCF.delete(iter.getColumnFamily());
-                    iterators.add(iter);
-                    sstablesToIterate++;
-                }
-            }
-
-            recentSSTablesPerRead.add(sstablesToIterate);
-            sstablesPerRead.add(sstablesToIterate);
-
-            // we need to distinguish between "there is no data at all for 
this row" (BF will let us rebuild that efficiently)
-            // and "there used to be data, but it's gone now" (we should cache 
the empty CF so we don't need to rebuild that slower)
-            if (iterators.size() == 0)
-                return null;
-
-            filter.collateColumns(returnCF, iterators, getComparator(), 
gcBefore);
-
-            // Caller is responsible for final removeDeletedCF.  This is 
important for cacheRow to work correctly:
-            return returnCF;
+            CollationController controller = new 
CollationController(currentView, factory, filter, metadata, gcBefore);
+            ColumnFamily columns = controller.getTopLevelColumns();
+            recentSSTablesPerRead.add(controller.getSstablesIterated());
+            sstablesPerRead.add(controller.getSstablesIterated());
+            return columns;
         }
         finally
         {
-            /* close all cursors */
-            for (IColumnIterator ci : iterators)
-            {
-                try
-                {
-                    ci.close();
-                }
-                catch (Throwable th)
-                {
-                    logger.error("error closing " + ci, th);
-                }
-            }
             SSTableReader.releaseReferences(currentView.sstables);
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Sun Aug 
21 04:50:55 2011
@@ -27,6 +27,8 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.Pair;
 
@@ -66,7 +69,7 @@ public class DataTracker
         return view.get().memtablesPendingFlush;
     }
 
-    public Set<SSTableReader> getSSTables()
+    public List<SSTableReader> getSSTables()
     {
         return view.get().sstables;
     }
@@ -242,7 +245,7 @@ public class DataTracker
     {
         view.set(new View(new Memtable(cfstore),
                           Collections.<Memtable>emptySet(),
-                          Collections.<SSTableReader>emptySet(),
+                          Collections.<SSTableReader>emptyList(),
                           Collections.<SSTableReader>emptySet()));
     }
 
@@ -461,10 +464,15 @@ public class DataTracker
     {
         public final Memtable memtable;
         public final Set<Memtable> memtablesPendingFlush;
-        public final Set<SSTableReader> sstables;
         public final Set<SSTableReader> compacting;
+        // We can't use a SortedSet here because "the ordering maintained by a 
sorted set (whether or not an
+        // explicit comparator is provided) must be <i>consistent with 
equals</i>."  In particular,
+        // ImmutableSortedSet will ignore any objects that compare equally 
with an existing Set member.
+        // Obviously, dropping sstables whose max column timestamp happens to 
be equal to another's
+        // is not acceptable for us.  So, we use a List instead.
+        public final List<SSTableReader> sstables;
 
-        View(Memtable memtable, Set<Memtable> pendingFlush, Set<SSTableReader> 
sstables, Set<SSTableReader> compacting)
+        View(Memtable memtable, Set<Memtable> pendingFlush, 
List<SSTableReader> sstables, Set<SSTableReader> compacting)
         {
             this.memtable = memtable;
             this.memtablesPendingFlush = pendingFlush;
@@ -486,15 +494,14 @@ public class DataTracker
         public View replaceFlushed(Memtable flushedMemtable, SSTableReader 
newSSTable)
         {
             Set<Memtable> newPending = 
ImmutableSet.copyOf(Sets.difference(memtablesPendingFlush, 
Collections.singleton(flushedMemtable)));
-            Set<SSTableReader> newSSTables = 
ImmutableSet.<SSTableReader>builder().addAll(sstables).add(newSSTable).build();
-            return new View(memtable, newPending, newSSTables, compacting);
+            List<SSTableReader> newSSTables = newSSTables(newSSTable);
+            return new View(memtable, newPending, 
Collections.unmodifiableList(newSSTables), compacting);
         }
 
         public View replace(Collection<SSTableReader> oldSSTables, 
Iterable<SSTableReader> replacements)
         {
-            Sets.SetView<SSTableReader> remaining = Sets.difference(sstables, 
ImmutableSet.copyOf(oldSSTables));
-            Set<SSTableReader> newSSTables = 
ImmutableSet.<SSTableReader>builder().addAll(remaining).addAll(replacements).build();
-            return new View(memtable, memtablesPendingFlush, newSSTables, 
compacting);
+            List<SSTableReader> newSSTables = newSSTables(oldSSTables, 
replacements);
+            return new View(memtable, memtablesPendingFlush, 
Collections.unmodifiableList(newSSTables), compacting);
         }
 
         public View markCompacting(Collection<SSTableReader> tomark)
@@ -508,5 +515,27 @@ public class DataTracker
             Set<SSTableReader> compactingNew = 
ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark)));
             return new View(memtable, memtablesPendingFlush, sstables, 
compactingNew);
         }
+
+        private List<SSTableReader> newSSTables(SSTableReader newSSTable)
+        {
+            // not performance-sensitive, don't obsess over doing a selection 
merge here
+            return newSSTables(Collections.<SSTableReader>emptyList(), 
Collections.singletonList(newSSTable));
+        }
+
+        private List<SSTableReader> newSSTables(Collection<SSTableReader> 
oldSSTables, Iterable<SSTableReader> replacements)
+        {
+            ImmutableSet<SSTableReader> oldSet = 
ImmutableSet.copyOf(oldSSTables);
+            int newSSTablesSize = sstables.size() - oldSSTables.size() + 
Iterables.size(replacements);
+            List<SSTableReader> newSSTables = new 
ArrayList<SSTableReader>(newSSTablesSize);
+            for (SSTableReader sstable : sstables)
+            {
+                if (!oldSet.contains(sstable))
+                    newSSTables.add(sstable);
+            }
+            Iterables.addAll(newSSTables, replacements);
+            assert newSSTables.size() == newSSTablesSize;
+            Collections.sort(newSSTables, SSTable.maxTimestampComparator);
+            return newSSTables;
+        }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java Sun Aug 21 
04:50:55 2011
@@ -73,7 +73,13 @@ public interface IColumn
 
     /**
      * For a standard column, this is the same as timestamp().
-     * For a super column, this is max the column value timestamp of the sub 
columns.
+     * For a super column, this is the max column timestamp of the sub columns.
      */
     public long maxTimestamp();
+
+    /**
+     * For a standard column, this is the same as timestamp().
+     * For a super column, this is the min column timestamp of the sub columns.
+     */
+    public long minTimestamp();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Sun Aug 
21 04:50:55 2011
@@ -132,11 +132,18 @@ public class SuperColumn extends Abstrac
 
     public long maxTimestamp()
     {
-        long maxTimestamp = Long.MIN_VALUE;
+        long maxTimestamp = getMarkedForDeleteAt();
         for (IColumn subColumn : getSubColumns())
             maxTimestamp = Math.max(maxTimestamp, subColumn.maxTimestamp());
+        return maxTimestamp;
+    }
 
-        return Math.max(maxTimestamp, getMarkedForDeleteAt());
+    public long minTimestamp()
+    {
+        long minTimestamp = getMarkedForDeleteAt();
+        for (IColumn subColumn : getSubColumns())
+            minTimestamp = Math.min(minTimestamp, subColumn.maxTimestamp());
+        return minTimestamp;
     }
 
     public long mostRecentLiveChangeAt()

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
 Sun Aug 21 04:50:55 2011
@@ -104,7 +104,7 @@ public class SSTableSliceIterator implem
 
     public boolean hasNext()
     {
-        return reader.hasNext();
+        return reader != null && reader.hasNext();
     }
 
     public IColumn next()

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Sun 
Aug 21 04:50:55 2011
@@ -22,9 +22,7 @@ package org.apache.cassandra.io.sstable;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +58,16 @@ public abstract class SSTable
 
     public static final String TEMPFILE_MARKER = "tmp";
 
+    public static final Comparator<SSTableReader> maxTimestampComparator = new 
Comparator<SSTableReader>()
+    {
+        public int compare(SSTableReader o1, SSTableReader o2)
+        {
+            long ts1 = o1.getMaxTimestamp();
+            long ts2 = o2.getMaxTimestamp();
+            return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1));
+        }
+    };
+
     public final Descriptor descriptor;
     protected final Set<Component> components;
     public final CFMetaData metadata;

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java 
(original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java 
Sun Aug 21 04:50:55 2011
@@ -53,6 +53,7 @@ import static org.apache.cassandra.Util.
 import static org.apache.cassandra.Util.getBytes;
 import static org.junit.Assert.assertNull;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 public class ColumnFamilyStoreTest extends CleanupHelper
@@ -69,21 +70,48 @@ public class ColumnFamilyStoreTest exten
     }
 
     @Test
+    // create two sstables, and verify that we only deserialize data from the 
most recent one
+    public void testTimeSortedQuery() throws IOException, ExecutionException, 
InterruptedException
+    {
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+        cfs.truncate().get();
+
+        RowMutation rm;
+        rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
+        rm.add(new QueryPath("Standard1", null, 
ByteBufferUtil.bytes("Column1")), ByteBufferUtil.bytes("asdf"), 0);
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
+        rm.add(new QueryPath("Standard1", null, 
ByteBufferUtil.bytes("Column1")), ByteBufferUtil.bytes("asdf"), 1);
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        cfs.getRecentSSTablesPerReadHistogram(); // resets counts
+        cfs.getColumnFamily(QueryFilter.getNamesFilter(Util.dk("key1"), new 
QueryPath("Standard1", null), ByteBufferUtil.bytes("Column1")));
+        assertEquals(1, cfs.getRecentSSTablesPerReadHistogram()[0]);
+    }
+
+    @Test
     public void testGetColumnWithWrongBF() throws IOException, 
ExecutionException, InterruptedException
     {
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+        cfs.truncate().get();
+
         List<IMutation> rms = new LinkedList<IMutation>();
         RowMutation rm;
         rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
         rm.add(new QueryPath("Standard1", null, 
ByteBufferUtil.bytes("Column1")), ByteBufferUtil.bytes("asdf"), 0);
         rm.add(new QueryPath("Standard1", null, 
ByteBufferUtil.bytes("Column2")), ByteBufferUtil.bytes("asdf"), 0);
         rms.add(rm);
-        ColumnFamilyStore store = Util.writeColumnFamily(rms);
+        Util.writeColumnFamily(rms);
 
-        Table table = Table.open("Keyspace1");
         List<SSTableReader> ssTables = table.getAllSSTables();
         assertEquals(1, ssTables.size());
         ssTables.get(0).forceFilterFailures();
-        ColumnFamily cf = 
store.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key2"), new 
QueryPath("Standard1", null, ByteBufferUtil.bytes("Column1"))));
+        ColumnFamily cf = 
cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key2"), new 
QueryPath("Standard1", null, ByteBufferUtil.bytes("Column1"))));
         assertNull(cf);
     }
 

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
 Sun Aug 21 04:50:55 2011
@@ -185,6 +185,7 @@ public class CompactionsTest extends Cle
                 store.forceBlockingFlush();
         }
         Collection<SSTableReader> toCompact = store.getSSTables();
+        assert toCompact.size() == 2;
 
         // Reinserting the same keys. We will compact only the previous 
sstable, but we need those new ones
         // to make sure we use EchoedRow, otherwise it won't be used because 
purge can be done.
@@ -200,12 +201,15 @@ public class CompactionsTest extends Cle
         for (SSTableReader sstable : store.getSSTables())
             if (!toCompact.contains(sstable))
                 tmpSSTable = sstable;
+        assert tmpSSTable != null;
 
         // Force compaction on first sstables. Since each row is in only one 
sstable, we will be using EchoedRow.
         Util.compact(store, toCompact, false);
+        assertEquals(2, store.getSSTables().size());
 
         // Now, we remove the sstable that was just created to force the use 
of EchoedRow (so that it doesn't hide the problem)
         store.markCompacted(Collections.singleton(tmpSSTable));
+        assertEquals(1, store.getSSTables().size());
 
         // Now assert we do have the 4 keys
         assertEquals(4, Util.getRangeSlice(store).size());


Reply via email to