Repository: cassandra
Updated Branches:
  refs/heads/trunk 415503353 -> 7d8ba3be5


Account for range tombstones in min/max column names

Patch by Oleg Anastasyev, reviewed by marcuse for CASSANDRA-7235


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/303ff22d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/303ff22d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/303ff22d

Branch: refs/heads/trunk
Commit: 303ff22dd608d4971a12de52f91184dcd82895c0
Parents: dd87228
Author: Marcus Eriksson <marc...@apache.org>
Authored: Thu Jun 19 08:50:27 2014 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Thu Jun 19 08:50:27 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/ColumnFamily.java   |  3 +
 .../db/compaction/LazilyCompactedRow.java       | 11 +--
 .../cassandra/io/sstable/SSTableWriter.java     |  3 +
 .../apache/cassandra/db/ColumnFamilyTest.java   | 12 +++
 .../db/compaction/CompactionsTest.java          | 97 +++++++++++++++++++-
 6 files changed, 120 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/303ff22d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16e0531..65e3161 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,7 @@
  * Make StreamSession#closeSession() idempotent (CASSANDRA-7262)
  * Fix infinite loop on exception while streaming (CASSANDRA-7330)
  * Reference sstables before populating key cache (CASSANDRA-7234)
+ * Account for range tombstones in min/max column names (CASSANDRA-7235)
 Merged from 1.2:
  * cqlsh: ignore .cassandra permission errors (CASSANDRA-7266)
  * Errors in FlushRunnable may leave threads hung (CASSANDRA-7275)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/303ff22d/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java 
b/src/java/org/apache/cassandra/db/ColumnFamily.java
index ec6a395..638eacc 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -426,6 +426,9 @@ public abstract class ColumnFamily implements 
Iterable<Column>, IRowCacheEntry
         {
             RangeTombstone rangeTombstone = it.next();
             tombstones.update(rangeTombstone.getLocalDeletionTime());
+
+            minColumnNamesSeen = 
ColumnNameHelper.minComponents(minColumnNamesSeen, rangeTombstone.min, 
metadata.comparator);
+            maxColumnNamesSeen = 
ColumnNameHelper.maxComponents(maxColumnNamesSeen, rangeTombstone.max, 
metadata.comparator);
         }
 
         for (Column column : this)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/303ff22d/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java 
b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index e10fb2c..7cd0842 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -252,6 +252,11 @@ public class LazilyCompactedRow extends 
AbstractCompactedRow implements Iterable
                 }
                 else
                 {
+                    tombstones.update(t.getLocalDeletionTime());
+
+                    minColumnNameSeen = 
ColumnNameHelper.minComponents(minColumnNameSeen, t.min, 
controller.cfs.metadata.comparator);
+                    maxColumnNameSeen = 
ColumnNameHelper.maxComponents(maxColumnNameSeen, t.max, 
controller.cfs.metadata.comparator);
+
                     return t;
                 }
             }
@@ -278,12 +283,6 @@ public class LazilyCompactedRow extends 
AbstractCompactedRow implements Iterable
                 int localDeletionTime = 
purged.deletionInfo().getTopLevelDeletion().localDeletionTime;
                 if (localDeletionTime < Integer.MAX_VALUE)
                     tombstones.update(localDeletionTime);
-                Iterator<RangeTombstone> rangeTombstoneIterator = 
purged.deletionInfo().rangeIterator();
-                while (rangeTombstoneIterator.hasNext())
-                {
-                    RangeTombstone rangeTombstone = 
rangeTombstoneIterator.next();
-                    tombstones.update(rangeTombstone.getLocalDeletionTime());
-                }
                 columns++;
                 minTimestampSeen = Math.min(minTimestampSeen, 
reduced.minTimestamp());
                 maxTimestampSeen = Math.max(maxTimestampSeen, 
reduced.maxTimestamp());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/303ff22d/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 6528ced..3a2dca0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -243,6 +243,9 @@ public class SSTableWriter extends SSTable
         {
             RangeTombstone rangeTombstone = rangeTombstoneIterator.next();
             tombstones.update(rangeTombstone.getLocalDeletionTime());
+
+            minColumnNames = ColumnNameHelper.minComponents(minColumnNames, 
rangeTombstone.min, metadata.comparator);
+            maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, 
rangeTombstone.max, metadata.comparator);
         }
 
         Iterator<OnDiskAtom> iter = metadata.getOnDiskIterator(in, 
columnCount, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/303ff22d/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java 
b/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
index a01c25c..e13d0d7 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
@@ -163,5 +163,17 @@ public class ColumnFamilyTest extends SchemaLoader
         cf.delete(new DeletionInfo(timestamp, localDeletionTime));
         ColumnStats stats = cf.getColumnStats();
         assertEquals(timestamp, stats.maxTimestamp);
+
+        cf.delete(new RangeTombstone(ByteBufferUtil.bytes("col2"), 
ByteBufferUtil.bytes("col21"), timestamp, localDeletionTime));
+
+        stats = cf.getColumnStats();
+        assertEquals(ByteBufferUtil.bytes("col2"), 
stats.minColumnNames.get(0));
+        assertEquals(ByteBufferUtil.bytes("col21"), 
stats.maxColumnNames.get(0));
+
+        cf.delete(new RangeTombstone(ByteBufferUtil.bytes("col6"), 
ByteBufferUtil.bytes("col61"), timestamp, localDeletionTime));
+        stats = cf.getColumnStats();
+
+        assertEquals(ByteBufferUtil.bytes("col2"), 
stats.minColumnNames.get(0));
+        assertEquals(ByteBufferUtil.bytes("col61"), 
stats.maxColumnNames.get(0));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/303ff22d/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 98eacbf..1879838 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -27,12 +27,14 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
-
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.filter.QueryFilter;
@@ -41,8 +43,11 @@ import org.apache.cassandra.dht.BytesToken;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.SSTableMetadata;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -344,6 +349,96 @@ public class CompactionsTest extends SchemaLoader
     }
 
     @Test
+    public void testRangeTombstones() throws IOException, ExecutionException, 
InterruptedException
+    {
+        boolean lazy = false;
+
+        do
+        {
+            Keyspace keyspace = Keyspace.open(KEYSPACE1);
+            ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard2");
+            cfs.clearUnsafe();
+
+            // disable compaction while flushing
+            cfs.disableAutoCompaction();
+
+            final CFMetaData cfmeta = cfs.metadata;
+            Directories dir = Directories.create(cfmeta.ksName, cfmeta.cfName);
+
+            ArrayList<DecoratedKey> keys = new ArrayList<DecoratedKey>();
+
+            for (int i=0; i < 4; i++)
+            {
+                keys.add(Util.dk(""+i));
+            }
+
+            ArrayBackedSortedColumns cf = 
ArrayBackedSortedColumns.factory.create(cfmeta);
+            cf.addColumn(Util.column("01", "a", 1)); // this must not resurrect
+            cf.addColumn(Util.column("a", "a", 3));
+            cf.deletionInfo().add(new 
RangeTombstone(ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("b"), 2, (int) 
(System.currentTimeMillis()/1000)),cfmeta.comparator);
+
+            SSTableWriter writer = new 
SSTableWriter(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables()),
+                                                     0,
+                                                     cfs.metadata,
+                                                     
StorageService.getPartitioner(),
+                                                     
SSTableMetadata.createCollector(cfs.metadata.comparator));
+
+
+            writer.append(Util.dk("0"), cf);
+            writer.append(Util.dk("1"), cf);
+            writer.append(Util.dk("3"), cf);
+
+            cfs.addSSTable(writer.closeAndOpenReader());
+            writer = new 
SSTableWriter(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables()),
+                                       0,
+                                       cfs.metadata,
+                                       StorageService.getPartitioner(),
+                                       
SSTableMetadata.createCollector(cfs.metadata.comparator));
+
+            writer.append(Util.dk("0"), cf);
+            writer.append(Util.dk("1"), cf);
+            writer.append(Util.dk("2"), cf);
+            writer.append(Util.dk("3"), cf);
+            cfs.addSSTable(writer.closeAndOpenReader());
+
+            Collection<SSTableReader> toCompact = cfs.getSSTables();
+            assert toCompact.size() == 2;
+
+            // forcing lazy comapction
+            if (lazy)
+                DatabaseDescriptor.setInMemoryCompactionLimit(0);
+
+            // Force compaction on first sstables. Since each row is in only 
one sstable, we will be using EchoedRow.
+            Util.compact(cfs, toCompact);
+            assertEquals(1, cfs.getSSTables().size());
+
+            // Now assert we do have the 4 keys
+            assertEquals(4, Util.getRangeSlice(cfs).size());
+
+            ArrayList<DecoratedKey> k = new ArrayList<DecoratedKey>();
+            for (Row r : Util.getRangeSlice(cfs))
+            {
+                k.add(r.key);
+                
assertEquals(ByteBufferUtil.bytes("a"),r.cf.getColumn(ByteBufferUtil.bytes("a")).value());
+                assertNull(r.cf.getColumn(ByteBufferUtil.bytes("01")));
+                
assertEquals(3,r.cf.getColumn(ByteBufferUtil.bytes("a")).timestamp());
+            }
+
+            for (SSTableReader sstable : cfs.getSSTables())
+            {
+                SSTableMetadata stats = sstable.getSSTableMetadata();
+                assertEquals(ByteBufferUtil.bytes("0"), 
stats.minColumnNames.get(0));
+                assertEquals(ByteBufferUtil.bytes("b"), 
stats.maxColumnNames.get(0));
+            }
+
+            assertEquals(keys, k);
+
+            lazy=!lazy;
+        }
+        while (lazy);
+    }
+
+    @Test
     public void testCompactionLog() throws Exception
     {
         SystemKeyspace.discardCompactionsInProgress();

Reply via email to