Optimize reads when row deletion timestamps allow us to restrict the sstables we check patch by Sam Tunnicliffe; reviewed by jbellis for CASSANDRA-4116
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3a04d854 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3a04d854 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3a04d854 Branch: refs/heads/trunk Commit: 3a04d85488b32c106ccfec76092b187f5728b3dd Parents: 116d8a1 Author: Jonathan Ellis <jbel...@apache.org> Authored: Tue May 1 23:26:17 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Tue May 1 23:26:17 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/cassandra/db/CollationController.java | 48 ++++++++- .../cassandra/db/CollationControllerTest.java | 83 +++++++++++++++ 3 files changed, 127 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a04d854/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 918c146..c37dd93 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 1.1.1-dev + * Optimize reads when row deletion timestamps allow us to restrict + the set of sstables we check (CASSANDRA-4116) * incremental repair by token range (CASSANDRA-3912) * streaming commitlog backup + pitr (CASSANDRA-3690) * avoid generating redundant compaction tasks during streaming http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a04d854/src/java/org/apache/cassandra/db/CollationController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java index 5289d1b..c73e1e0 100644 --- a/src/java/org/apache/cassandra/db/CollationController.java +++ b/src/java/org/apache/cassandra/db/CollationController.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,8 +106,15 @@ public class CollationController Collections.sort(view.sstables, SSTable.maxTimestampComparator); // read sorted sstables + long mostRecentRowTombstone = Long.MIN_VALUE; for (SSTableReader sstable : view.sstables) { + // if we've already seen a row tombstone with a timestamp greater + // than the most recent update to this sstable, we're done, since the rest of the sstables + // will also be older + if (sstable.getMaxTimestamp() < mostRecentRowTombstone) + break; + long currentMaxTs = sstable.getMaxTimestamp(); reduceNameFilter(reducedFilter, container, currentMaxTs); if (((NamesQueryFilter) reducedFilter.filter).columns.isEmpty()) @@ -115,7 +124,14 @@ public class CollationController iterators.add(iter); if (iter.getColumnFamily() != null) { - container.delete(iter.getColumnFamily()); + ColumnFamily cf = iter.getColumnFamily(); + if (cf.isMarkedForDelete()) + { + // track the most recent row level tombstone we encounter + mostRecentRowTombstone = cf.getMarkedForDeleteAt(); + } + + container.delete(cf); sstablesIterated++; while (iter.hasNext()) container.addColumn(iter.next()); @@ -212,10 +228,10 @@ public class CollationController ISortedColumns.Factory factory = mutableColumns ? cfs.metadata.cfType == ColumnFamilyType.Super ? ThreadSafeSortedColumns.factory() : AtomicSortedColumns.factory() : ArrayBackedSortedColumns.factory(); - List<IColumnIterator> iterators = new ArrayList<IColumnIterator>(); + ColumnFamilyStore.ViewFragment view = cfs.markReferenced(filter.key); + List<IColumnIterator> iterators = new ArrayList<IColumnIterator>(Iterables.size(view.memtables) + view.sstables.size()); ColumnFamily returnCF = ColumnFamily.create(cfs.metadata, factory, filter.filter.isReversed()); - ColumnFamilyStore.ViewFragment view = cfs.markReferenced(filter.key); try { for (Memtable memtable : view.memtables) @@ -227,17 +243,37 @@ public class CollationController iterators.add(iter); } } - + + long mostRecentRowTombstone = Long.MIN_VALUE; + Map<IColumnIterator, Long> iteratorMaxTimes = Maps.newHashMapWithExpectedSize(view.sstables.size()); for (SSTableReader sstable : view.sstables) { + // if we've already seen a row tombstone with a timestamp greater + // than the most recent update to this sstable, we can skip it + if (sstable.getMaxTimestamp() < mostRecentRowTombstone) + continue; + IColumnIterator iter = filter.getSSTableColumnIterator(sstable); - iterators.add(iter); + iteratorMaxTimes.put(iter, sstable.getMaxTimestamp()); if (iter.getColumnFamily() != null) { - returnCF.delete(iter.getColumnFamily()); + ColumnFamily cf = iter.getColumnFamily(); + if (cf.isMarkedForDelete()) + mostRecentRowTombstone = cf.getMarkedForDeleteAt(); + + returnCF.delete(cf); sstablesIterated++; } } + + // If we saw a row tombstone, do a second pass through the iterators we + // obtained from the sstables and drop any whose maxTimestamp < that of the + // row tombstone + for (Map.Entry<IColumnIterator, Long> entry : iteratorMaxTimes.entrySet()) + { + if (entry.getValue() >= mostRecentRowTombstone) + iterators.add(entry.getKey()); + } // we need to distinguish between "there is no data at all for this row" (BF will let us rebuild that efficiently) // and "there used to be data, but it's gone now" (we should cache the empty CF so we don't need to rebuild that slower) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a04d854/test/unit/org/apache/cassandra/db/CollationControllerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CollationControllerTest.java b/test/unit/org/apache/cassandra/db/CollationControllerTest.java new file mode 100644 index 0000000..f469639 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/CollationControllerTest.java @@ -0,0 +1,83 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.junit.Test; + +public class CollationControllerTest extends SchemaLoader +{ + @Test + public void getTopLevelColumnsSkipsSSTablesModifiedBeforeRowDelete() + throws IOException, ExecutionException, InterruptedException + { + Table table = Table.open("Keyspace1"); + ColumnFamilyStore store = table.getColumnFamilyStore("Standard1"); + RowMutation rm; + DecoratedKey dk = Util.dk("key1"); + QueryPath path = new QueryPath("Standard1", null, ByteBufferUtil.bytes("Column1")); + + // add data + rm = new RowMutation("Keyspace1", dk.key); + rm.add(path, ByteBufferUtil.bytes("asdf"), 0); + rm.apply(); + store.forceBlockingFlush(); + + // remove + rm = new RowMutation("Keyspace1", dk.key); + rm.delete(new QueryPath("Standard1"), 10); + rm.apply(); + + // add another mutation because sstable maxtimestamp isn't set + // correctly during flush if the most recent mutation is a row delete + rm = new RowMutation("Keyspace1", Util.dk("key2").key); + rm.add(path, ByteBufferUtil.bytes("zxcv"), 20); + rm.apply(); + + store.forceBlockingFlush(); + + // A NamesQueryFilter goes down one code path (through collectTimeOrderedData()) + QueryFilter filter = QueryFilter.getNamesFilter(dk, path, ByteBufferUtil.bytes("Column1")); + CollationController controller = new CollationController(store, false, filter, Integer.MIN_VALUE); + controller.getTopLevelColumns(); + assertEquals(1, controller.getSstablesIterated()); + + // SliceQueryFilter goes down another path (through collectAllData()) + // Add another mutation, with a lower timestamp then force another flush + // so we can assert that we're not reading every sstable + rm = new RowMutation("Keyspace1", dk.key); + rm.add(path, ByteBufferUtil.bytes("asdf"), 5); + rm.apply(); + store.forceBlockingFlush(); + + filter = QueryFilter.getIdentityFilter(dk, path); + controller = new CollationController(store, false, filter, Integer.MIN_VALUE); + controller.getTopLevelColumns(); + assertEquals(2, controller.getSstablesIterated()); + } +}