Repository: cassandra Updated Branches: refs/heads/trunk 415503353 -> 7d8ba3be5
Account for range tombstones in min/max column names Patch by Oleg Anastasyev, reviewed by marcuse for CASSANDRA-7235 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/303ff22d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/303ff22d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/303ff22d Branch: refs/heads/trunk Commit: 303ff22dd608d4971a12de52f91184dcd82895c0 Parents: dd87228 Author: Marcus Eriksson <marc...@apache.org> Authored: Thu Jun 19 08:50:27 2014 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Thu Jun 19 08:50:27 2014 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamily.java | 3 + .../db/compaction/LazilyCompactedRow.java | 11 +-- .../cassandra/io/sstable/SSTableWriter.java | 3 + .../apache/cassandra/db/ColumnFamilyTest.java | 12 +++ .../db/compaction/CompactionsTest.java | 97 +++++++++++++++++++- 6 files changed, 120 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/303ff22d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 16e0531..65e3161 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -17,6 +17,7 @@ * Make StreamSession#closeSession() idempotent (CASSANDRA-7262) * Fix infinite loop on exception while streaming (CASSANDRA-7330) * Reference sstables before populating key cache (CASSANDRA-7234) + * Account for range tombstones in min/max column names (CASSANDRA-7235) Merged from 1.2: * cqlsh: ignore .cassandra permission errors (CASSANDRA-7266) * Errors in FlushRunnable may leave threads hung (CASSANDRA-7275) http://git-wip-us.apache.org/repos/asf/cassandra/blob/303ff22d/src/java/org/apache/cassandra/db/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java index ec6a395..638eacc 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamily.java +++ b/src/java/org/apache/cassandra/db/ColumnFamily.java @@ -426,6 +426,9 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry { RangeTombstone rangeTombstone = it.next(); tombstones.update(rangeTombstone.getLocalDeletionTime()); + + minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, rangeTombstone.min, metadata.comparator); + maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, rangeTombstone.max, metadata.comparator); } for (Column column : this) http://git-wip-us.apache.org/repos/asf/cassandra/blob/303ff22d/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java index e10fb2c..7cd0842 100644 --- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java @@ -252,6 +252,11 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable } else { + tombstones.update(t.getLocalDeletionTime()); + + minColumnNameSeen = ColumnNameHelper.minComponents(minColumnNameSeen, t.min, controller.cfs.metadata.comparator); + maxColumnNameSeen = ColumnNameHelper.maxComponents(maxColumnNameSeen, t.max, controller.cfs.metadata.comparator); + return t; } } @@ -278,12 +283,6 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable int localDeletionTime = purged.deletionInfo().getTopLevelDeletion().localDeletionTime; if (localDeletionTime < Integer.MAX_VALUE) tombstones.update(localDeletionTime); - Iterator<RangeTombstone> rangeTombstoneIterator = purged.deletionInfo().rangeIterator(); - while (rangeTombstoneIterator.hasNext()) - { - RangeTombstone rangeTombstone = rangeTombstoneIterator.next(); - tombstones.update(rangeTombstone.getLocalDeletionTime()); - } columns++; minTimestampSeen = Math.min(minTimestampSeen, reduced.minTimestamp()); maxTimestampSeen = Math.max(maxTimestampSeen, reduced.maxTimestamp()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/303ff22d/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 6528ced..3a2dca0 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -243,6 +243,9 @@ public class SSTableWriter extends SSTable { RangeTombstone rangeTombstone = rangeTombstoneIterator.next(); tombstones.update(rangeTombstone.getLocalDeletionTime()); + + minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator); + maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator); } Iterator<OnDiskAtom> iter = metadata.getOnDiskIterator(in, columnCount, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version); http://git-wip-us.apache.org/repos/asf/cassandra/blob/303ff22d/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java index a01c25c..e13d0d7 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java @@ -163,5 +163,17 @@ public class ColumnFamilyTest extends SchemaLoader cf.delete(new DeletionInfo(timestamp, localDeletionTime)); ColumnStats stats = cf.getColumnStats(); assertEquals(timestamp, stats.maxTimestamp); + + cf.delete(new RangeTombstone(ByteBufferUtil.bytes("col2"), ByteBufferUtil.bytes("col21"), timestamp, localDeletionTime)); + + stats = cf.getColumnStats(); + assertEquals(ByteBufferUtil.bytes("col2"), stats.minColumnNames.get(0)); + assertEquals(ByteBufferUtil.bytes("col21"), stats.maxColumnNames.get(0)); + + cf.delete(new RangeTombstone(ByteBufferUtil.bytes("col6"), ByteBufferUtil.bytes("col61"), timestamp, localDeletionTime)); + stats = cf.getColumnStats(); + + assertEquals(ByteBufferUtil.bytes("col2"), stats.minColumnNames.get(0)); + assertEquals(ByteBufferUtil.bytes("col61"), stats.maxColumnNames.get(0)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/303ff22d/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index 98eacbf..1879838 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -27,12 +27,14 @@ import java.util.concurrent.TimeUnit; import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; + import org.junit.Test; import org.junit.runner.RunWith; - import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.filter.QueryFilter; @@ -41,8 +43,11 @@ import org.apache.cassandra.dht.BytesToken; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.SSTableMetadata; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableScanner; +import org.apache.cassandra.io.sstable.SSTableWriter; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -344,6 +349,96 @@ public class CompactionsTest extends SchemaLoader } @Test + public void testRangeTombstones() throws IOException, ExecutionException, InterruptedException + { + boolean lazy = false; + + do + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard2"); + cfs.clearUnsafe(); + + // disable compaction while flushing + cfs.disableAutoCompaction(); + + final CFMetaData cfmeta = cfs.metadata; + Directories dir = Directories.create(cfmeta.ksName, cfmeta.cfName); + + ArrayList<DecoratedKey> keys = new ArrayList<DecoratedKey>(); + + for (int i=0; i < 4; i++) + { + keys.add(Util.dk(""+i)); + } + + ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfmeta); + cf.addColumn(Util.column("01", "a", 1)); // this must not resurrect + cf.addColumn(Util.column("a", "a", 3)); + cf.deletionInfo().add(new RangeTombstone(ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("b"), 2, (int) (System.currentTimeMillis()/1000)),cfmeta.comparator); + + SSTableWriter writer = new SSTableWriter(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables()), + 0, + cfs.metadata, + StorageService.getPartitioner(), + SSTableMetadata.createCollector(cfs.metadata.comparator)); + + + writer.append(Util.dk("0"), cf); + writer.append(Util.dk("1"), cf); + writer.append(Util.dk("3"), cf); + + cfs.addSSTable(writer.closeAndOpenReader()); + writer = new SSTableWriter(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables()), + 0, + cfs.metadata, + StorageService.getPartitioner(), + SSTableMetadata.createCollector(cfs.metadata.comparator)); + + writer.append(Util.dk("0"), cf); + writer.append(Util.dk("1"), cf); + writer.append(Util.dk("2"), cf); + writer.append(Util.dk("3"), cf); + cfs.addSSTable(writer.closeAndOpenReader()); + + Collection<SSTableReader> toCompact = cfs.getSSTables(); + assert toCompact.size() == 2; + + // forcing lazy comapction + if (lazy) + DatabaseDescriptor.setInMemoryCompactionLimit(0); + + // Force compaction on first sstables. Since each row is in only one sstable, we will be using EchoedRow. + Util.compact(cfs, toCompact); + assertEquals(1, cfs.getSSTables().size()); + + // Now assert we do have the 4 keys + assertEquals(4, Util.getRangeSlice(cfs).size()); + + ArrayList<DecoratedKey> k = new ArrayList<DecoratedKey>(); + for (Row r : Util.getRangeSlice(cfs)) + { + k.add(r.key); + assertEquals(ByteBufferUtil.bytes("a"),r.cf.getColumn(ByteBufferUtil.bytes("a")).value()); + assertNull(r.cf.getColumn(ByteBufferUtil.bytes("01"))); + assertEquals(3,r.cf.getColumn(ByteBufferUtil.bytes("a")).timestamp()); + } + + for (SSTableReader sstable : cfs.getSSTables()) + { + SSTableMetadata stats = sstable.getSSTableMetadata(); + assertEquals(ByteBufferUtil.bytes("0"), stats.minColumnNames.get(0)); + assertEquals(ByteBufferUtil.bytes("b"), stats.maxColumnNames.get(0)); + } + + assertEquals(keys, k); + + lazy=!lazy; + } + while (lazy); + } + + @Test public void testCompactionLog() throws Exception { SystemKeyspace.discardCompactionsInProgress();