Optimize reads when row deletion timestamps allow us to restrict the sstables 
we check
patch by Sam Tunnicliffe; reviewed by jbellis for CASSANDRA-4116


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3a04d854
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3a04d854
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3a04d854

Branch: refs/heads/trunk
Commit: 3a04d85488b32c106ccfec76092b187f5728b3dd
Parents: 116d8a1
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Tue May 1 23:26:17 2012 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Tue May 1 23:26:17 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 .../apache/cassandra/db/CollationController.java   |   48 ++++++++-
 .../cassandra/db/CollationControllerTest.java      |   83 +++++++++++++++
 3 files changed, 127 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a04d854/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 918c146..c37dd93 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 1.1.1-dev
+ * Optimize reads when row deletion timestamps allow us to restrict
+   the set of sstables we check (CASSANDRA-4116)
  * incremental repair by token range (CASSANDRA-3912)
  * streaming commitlog backup + pitr (CASSANDRA-3690)
  * avoid generating redundant compaction tasks during streaming

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a04d854/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java 
b/src/java/org/apache/cassandra/db/CollationController.java
index 5289d1b..c73e1e0 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -104,8 +106,15 @@ public class CollationController
             Collections.sort(view.sstables, SSTable.maxTimestampComparator);
 
             // read sorted sstables
+            long mostRecentRowTombstone = Long.MIN_VALUE;
             for (SSTableReader sstable : view.sstables)
             {
+                // if we've already seen a row tombstone with a timestamp 
greater 
+                // than the most recent update to this sstable, we're done, 
since the rest of the sstables
+                // will also be older
+                if (sstable.getMaxTimestamp() < mostRecentRowTombstone)
+                    break;
+                
                 long currentMaxTs = sstable.getMaxTimestamp();
                 reduceNameFilter(reducedFilter, container, currentMaxTs);
                 if (((NamesQueryFilter) 
reducedFilter.filter).columns.isEmpty())
@@ -115,7 +124,14 @@ public class CollationController
                 iterators.add(iter);
                 if (iter.getColumnFamily() != null)
                 {
-                    container.delete(iter.getColumnFamily());
+                    ColumnFamily cf = iter.getColumnFamily();
+                    if (cf.isMarkedForDelete())
+                    {
+                        // track the most recent row level tombstone we 
encounter
+                        mostRecentRowTombstone = cf.getMarkedForDeleteAt();
+                    }
+                    
+                    container.delete(cf);
                     sstablesIterated++;
                     while (iter.hasNext())
                         container.addColumn(iter.next());
@@ -212,10 +228,10 @@ public class CollationController
         ISortedColumns.Factory factory = mutableColumns
                                        ? cfs.metadata.cfType == 
ColumnFamilyType.Super ? ThreadSafeSortedColumns.factory() : 
AtomicSortedColumns.factory()
                                        : ArrayBackedSortedColumns.factory();
-        List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
+        ColumnFamilyStore.ViewFragment view = cfs.markReferenced(filter.key);
+        List<IColumnIterator> iterators = new 
ArrayList<IColumnIterator>(Iterables.size(view.memtables) + 
view.sstables.size());
         ColumnFamily returnCF = ColumnFamily.create(cfs.metadata, factory, 
filter.filter.isReversed());
 
-        ColumnFamilyStore.ViewFragment view = cfs.markReferenced(filter.key);
         try
         {
             for (Memtable memtable : view.memtables)
@@ -227,17 +243,37 @@ public class CollationController
                     iterators.add(iter);
                 }
             }
-
+            
+            long mostRecentRowTombstone = Long.MIN_VALUE;
+            Map<IColumnIterator, Long> iteratorMaxTimes = 
Maps.newHashMapWithExpectedSize(view.sstables.size());
             for (SSTableReader sstable : view.sstables)
             {
+                // if we've already seen a row tombstone with a timestamp 
greater 
+                // than the most recent update to this sstable, we can skip it
+                if (sstable.getMaxTimestamp() < mostRecentRowTombstone)
+                    continue;
+
                 IColumnIterator iter = 
filter.getSSTableColumnIterator(sstable);
-                iterators.add(iter);
+                iteratorMaxTimes.put(iter, sstable.getMaxTimestamp());
                 if (iter.getColumnFamily() != null)
                 {
-                    returnCF.delete(iter.getColumnFamily());
+                    ColumnFamily cf = iter.getColumnFamily();
+                    if (cf.isMarkedForDelete())
+                        mostRecentRowTombstone = cf.getMarkedForDeleteAt();
+
+                    returnCF.delete(cf);
                     sstablesIterated++;
                 }
             }
+            
+            // If we saw a row tombstone, do a second pass through the 
iterators we
+            // obtained from the sstables and drop any whose maxTimestamp < 
that of the
+            // row tombstone
+            for (Map.Entry<IColumnIterator, Long> entry : 
iteratorMaxTimes.entrySet())
+            {
+                if (entry.getValue() >= mostRecentRowTombstone)
+                    iterators.add(entry.getKey());
+            }
 
             // we need to distinguish between "there is no data at all for 
this row" (BF will let us rebuild that efficiently)
             // and "there used to be data, but it's gone now" (we should cache 
the empty CF so we don't need to rebuild that slower)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a04d854/test/unit/org/apache/cassandra/db/CollationControllerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CollationControllerTest.java 
b/test/unit/org/apache/cassandra/db/CollationControllerTest.java
new file mode 100644
index 0000000..f469639
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/CollationControllerTest.java
@@ -0,0 +1,83 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.junit.Test;
+
+public class CollationControllerTest extends SchemaLoader
+{
+    @Test
+    public void getTopLevelColumnsSkipsSSTablesModifiedBeforeRowDelete() 
+            throws IOException, ExecutionException, InterruptedException
+    {
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+        RowMutation rm;
+        DecoratedKey dk = Util.dk("key1");
+        QueryPath path = new QueryPath("Standard1", null, 
ByteBufferUtil.bytes("Column1"));
+        
+        // add data
+        rm = new RowMutation("Keyspace1", dk.key);
+        rm.add(path, ByteBufferUtil.bytes("asdf"), 0);
+        rm.apply();
+        store.forceBlockingFlush();
+        
+        // remove
+        rm = new RowMutation("Keyspace1", dk.key);
+        rm.delete(new QueryPath("Standard1"), 10);
+        rm.apply();
+        
+        // add another mutation because sstable maxtimestamp isn't set
+        // correctly during flush if the most recent mutation is a row delete
+        rm = new RowMutation("Keyspace1", Util.dk("key2").key);
+        rm.add(path, ByteBufferUtil.bytes("zxcv"), 20);
+        rm.apply();
+        
+        store.forceBlockingFlush();
+
+        // A NamesQueryFilter goes down one code path (through 
collectTimeOrderedData())
+        QueryFilter filter = QueryFilter.getNamesFilter(dk, path, 
ByteBufferUtil.bytes("Column1"));
+        CollationController controller = new CollationController(store, false, 
filter, Integer.MIN_VALUE);
+        controller.getTopLevelColumns();
+        assertEquals(1, controller.getSstablesIterated());
+        
+        // SliceQueryFilter goes down another path (through collectAllData())
+        // Add another mutation, with a lower timestamp then force another 
flush 
+        // so we can assert that we're not reading every sstable 
+        rm = new RowMutation("Keyspace1", dk.key);
+        rm.add(path, ByteBufferUtil.bytes("asdf"), 5);
+        rm.apply();
+        store.forceBlockingFlush();
+        
+        filter = QueryFilter.getIdentityFilter(dk, path);
+        controller = new CollationController(store, false, filter, 
Integer.MIN_VALUE);
+        controller.getTopLevelColumns();
+        assertEquals(2, controller.getSstablesIterated());
+    }
+}

Reply via email to