Fix sstable reader to support range-tombstone-marker for multi-slices

patch by Zhao Yang; reviewed by Sylvain Lebresne for CASSANDRA-13787


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/975c3d81
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/975c3d81
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/975c3d81

Branch: refs/heads/cassandra-3.11
Commit: 975c3d81b67e9c1e1dcefdda3f90e8edf6be5efa
Parents: 35e32f2
Author: Zhao Yang <zhaoyangsingap...@gmail.com>
Authored: Wed Aug 23 16:15:25 2017 +0800
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Wed Sep 20 15:09:49 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../columniterator/AbstractSSTableIterator.java |   7 -
 .../db/columniterator/SSTableIterator.java      |   8 +-
 .../columniterator/SSTableReversedIterator.java |   2 +-
 .../org/apache/cassandra/cql3/ViewTest.java     |  49 +++++++
 .../db/SinglePartitionSliceCommandTest.java     | 145 ++++++++++++++++++-
 6 files changed, 197 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/975c3d81/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 74e70e1..2d11a3e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Fix sstable reader to support range-tombstone-marker for multi-slices 
(CASSANDRA-13787)
  * Fix short read protection for tables with no clustering columns 
(CASSANDRA-13880)
  * Make isBuilt volatile in PartitionUpdate (CASSANDRA-13619)
  * Prevent integer overflow of timestamps in CellTest and RowsTest 
(CASSANDRA-13866)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/975c3d81/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index c61b6aa..f9e6545 100644
--- 
a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ 
b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -329,13 +329,6 @@ abstract class AbstractSSTableIterator implements 
SliceableUnfilteredRowIterator
             openMarker = marker.isOpen(false) ? marker.openDeletionTime(false) 
: null;
         }
 
-        protected DeletionTime getAndClearOpenMarker()
-        {
-            DeletionTime toReturn = openMarker;
-            openMarker = null;
-            return toReturn;
-        }
-
         public boolean hasNext() 
         {
             try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/975c3d81/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
index ff91871..47f85ac 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
@@ -173,14 +173,14 @@ public class SSTableIterator extends 
AbstractSSTableIterator
             if (next != null)
                 return true;
 
-            // If we have an open marker, we should close it before finishing
+            // for current slice, no data read from deserialization
+            sliceDone = true;
+            // If we have an open marker, we should not close it, there could 
be more slices
             if (openMarker != null)
             {
-                next = new RangeTombstoneBoundMarker(end, 
getAndClearOpenMarker());
+                next = new RangeTombstoneBoundMarker(end, openMarker);
                 return true;
             }
-
-            sliceDone = true; // not absolutely necessary but accurate and 
cheap
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/975c3d81/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index b12ed67..76d8c4d 100644
--- 
a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ 
b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -240,7 +240,7 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
                 // skipFirstIteratedItem (this is the last item of the block, 
but we're iterating in reverse order so it will
                 // be the first returned by the iterator).
                 RangeTombstone.Bound markerEnd = end == null ? 
RangeTombstone.Bound.TOP : RangeTombstone.Bound.fromSliceBound(end);
-                buffer.add(new RangeTombstoneBoundMarker(markerEnd, 
getAndClearOpenMarker()));
+                buffer.add(new RangeTombstoneBoundMarker(markerEnd, 
openMarker));
                 if (hasPreviousBlock)
                     skipFirstIteratedItem = true;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/975c3d81/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java 
b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index 61f4b4a..84b2773 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.utils.FBUtilities;
 
+
 public class ViewTest extends CQLTester
 {
     int protocolVersion = 4;
@@ -99,6 +100,54 @@ public class ViewTest extends CQLTester
     }
 
     @Test
+    public void testExistingRangeTombstoneWithFlush() throws Throwable
+    {
+        testExistingRangeTombstone(true);
+    }
+
+    @Test
+    public void testExistingRangeTombstoneWithoutFlush() throws Throwable
+    {
+        testExistingRangeTombstone(false);
+    }
+
+    public void testExistingRangeTombstone(boolean flush) throws Throwable
+    {
+        createTable("CREATE TABLE %s (k1 int, c1 int, c2 int, v1 int, v2 int, 
PRIMARY KEY (k1, c1, c2))");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        createView("view1",
+                   "CREATE MATERIALIZED VIEW view1 AS SELECT * FROM %%s WHERE 
k1 IS NOT NULL AND c1 IS NOT NULL AND c2 IS NOT NULL PRIMARY KEY (k1, c2, c1)");
+
+        updateView("DELETE FROM %s USING TIMESTAMP 10 WHERE k1 = 1 and c1=1");
+
+        if (flush)
+            
Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()).forceBlockingFlush();
+
+        String table = KEYSPACE + "." + currentTable();
+        updateView("BEGIN BATCH " +
+                "INSERT INTO " + table + " (k1, c1, c2, v1, v2) VALUES (1, 0, 
0, 0, 0) USING TIMESTAMP 5; " +
+                "INSERT INTO " + table + " (k1, c1, c2, v1, v2) VALUES (1, 0, 
1, 0, 1) USING TIMESTAMP 5; " +
+                "INSERT INTO " + table + " (k1, c1, c2, v1, v2) VALUES (1, 1, 
0, 1, 0) USING TIMESTAMP 5; " +
+                "INSERT INTO " + table + " (k1, c1, c2, v1, v2) VALUES (1, 1, 
1, 1, 1) USING TIMESTAMP 5; " +
+                "INSERT INTO " + table + " (k1, c1, c2, v1, v2) VALUES (1, 1, 
2, 1, 2) USING TIMESTAMP 5; " +
+                "INSERT INTO " + table + " (k1, c1, c2, v1, v2) VALUES (1, 1, 
3, 1, 3) USING TIMESTAMP 5; " +
+                "INSERT INTO " + table + " (k1, c1, c2, v1, v2) VALUES (1, 2, 
0, 2, 0) USING TIMESTAMP 5; " +
+                "APPLY BATCH");
+
+        assertRowsIgnoringOrder(execute("select * from %s"),
+                                row(1, 0, 0, 0, 0),
+                                row(1, 0, 1, 0, 1),
+                                row(1, 2, 0, 2, 0));
+        assertRowsIgnoringOrder(execute("select k1,c1,c2,v1,v2 from view1"),
+                                row(1, 0, 0, 0, 0),
+                                row(1, 0, 1, 0, 1),
+                                row(1, 2, 0, 2, 0));
+    }
+
+    @Test
     public void testPartitionTombstone() throws Throwable
     {
         createTable("CREATE TABLE %s (k1 int, c1 int , val int, PRIMARY KEY 
(k1, c1))");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/975c3d81/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java 
b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
index 02b642e..b1a374f 100644
--- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
@@ -20,11 +20,12 @@
  */
 package org.apache.cassandra.db;
 
+import static org.junit.Assert.*;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Iterator;
-
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -32,23 +33,28 @@ import org.junit.Test;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.filter.AbstractClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
 import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.IntegerType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
 import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.util.DataInputBuffer;
@@ -58,6 +64,7 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.btree.BTreeSet;
 
 public class SinglePartitionSliceCommandTest
 {
@@ -70,6 +77,9 @@ public class SinglePartitionSliceCommandTest
     private static ColumnDefinition v;
     private static ColumnDefinition s;
 
+    private static final String TABLE_SCLICES = "tbl_slices";
+    private static CFMetaData CFM_SLICES;
+
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {
@@ -80,17 +90,28 @@ public class SinglePartitionSliceCommandTest
                                 .addRegularColumn("v", UTF8Type.instance)
                                 .build();
 
+        CFM_SLICES = CFMetaData.Builder.create(KEYSPACE, TABLE_SCLICES)
+                                       .addPartitionKey("k", UTF8Type.instance)
+                                       .addClusteringColumn("c1", 
Int32Type.instance)
+                                       .addClusteringColumn("c2", 
Int32Type.instance)
+                                       .addRegularColumn("v", 
IntegerType.instance)
+                                       .build();
+
         SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), cfm);
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), cfm, 
CFM_SLICES);
+
         cfm = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
         v = cfm.getColumnDefinition(new ColumnIdentifier("v", true));
         s = cfm.getColumnDefinition(new ColumnIdentifier("s", true));
+
+        CFM_SLICES = Schema.instance.getCFMetaData(KEYSPACE, TABLE_SCLICES);
     }
 
     @Before
     public void truncate()
     {
         Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE).truncateBlocking();
+        
Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE_SCLICES).truncateBlocking();
     }
 
     @Test
@@ -143,6 +164,124 @@ public class SinglePartitionSliceCommandTest
         Assert.assertEquals(Collections.emptyList(), rowIter.cells);
     }
 
+    @Test
+    public void testMultiNamesCommandWithFlush()
+    {
+        testMultiNamesOrSlicesCommand(true, false);
+    }
+
+    @Test
+    public void testMultiNamesCommandWithoutFlush()
+    {
+        testMultiNamesOrSlicesCommand(false, false);
+    }
+
+    @Test
+    public void testMultiSlicesCommandWithFlush()
+    {
+        testMultiNamesOrSlicesCommand(true, true);
+    }
+
+    @Test
+    public void testMultiSlicesCommandWithoutFlush()
+    {
+        testMultiNamesOrSlicesCommand(false, true);
+    }
+
+    private AbstractClusteringIndexFilter createClusteringFilter(int 
uniqueCk1, int uniqueCk2, boolean isSlice)
+    {
+        Slices.Builder slicesBuilder = new 
Slices.Builder(CFM_SLICES.comparator);
+        BTreeSet.Builder<Clustering> namesBuilder = 
BTreeSet.builder(CFM_SLICES.comparator);
+
+        for (int ck1 = 0; ck1 < uniqueCk1; ck1++)
+        {
+            for (int ck2 = 0; ck2 < uniqueCk2; ck2++)
+            {
+                if (isSlice)
+                    
slicesBuilder.add(Slice.make(Util.clustering(CFM_SLICES.comparator, ck1, ck2)));
+                else
+                    namesBuilder.add(Util.clustering(CFM_SLICES.comparator, 
ck1, ck2));
+            }
+        }
+        if (isSlice)
+            return new ClusteringIndexSliceFilter(slicesBuilder.build(), 
false);
+        return new ClusteringIndexNamesFilter(namesBuilder.build(), false);
+    }
+
+    private void testMultiNamesOrSlicesCommand(boolean flush, boolean isSlice)
+    {
+        boolean isTombstone = flush || isSlice;
+        int deletionTime = 5;
+        int ck1 = 1;
+        int uniqueCk1 = 2;
+        int uniqueCk2 = 3;
+
+        DecoratedKey key = CFM_SLICES.decorateKey(ByteBufferUtil.bytes("k"));
+        QueryProcessor.executeInternal(String.format("DELETE FROM 
ks.tbl_slices USING TIMESTAMP %d WHERE k='k' AND c1=%d",
+                                                     deletionTime,
+                                                     ck1));
+
+        if (flush)
+            
Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE_SCLICES).forceBlockingFlush();
+
+        AbstractClusteringIndexFilter clusteringFilter = 
createClusteringFilter(uniqueCk1, uniqueCk2, isSlice);
+        ReadCommand cmd = SinglePartitionReadCommand.create(CFM_SLICES,
+                                                            
FBUtilities.nowInSeconds(),
+                                                            
ColumnFilter.all(CFM_SLICES),
+                                                            RowFilter.NONE,
+                                                            DataLimits.NONE,
+                                                            key,
+                                                            clusteringFilter);
+
+        UnfilteredPartitionIterator partitionIterator = 
cmd.executeLocally(ReadOrderGroup.emptyGroup());
+        assert partitionIterator.hasNext();
+        UnfilteredRowIterator partition = partitionIterator.next();
+
+        int count = 0;
+        boolean open = true;
+        while (partition.hasNext())
+        {
+            Unfiltered unfiltered = partition.next();
+            if (isTombstone)
+            {
+                assertTrue(unfiltered.isRangeTombstoneMarker());
+                RangeTombstoneMarker marker = (RangeTombstoneMarker) 
unfiltered;
+
+                // check if it's open-close pair
+                assertTrue(marker.isOpen(false) == open);
+                // check deletion time same as Range Deletion
+                if (open)
+                    assertEquals(deletionTime, 
marker.openDeletionTime(false).markedForDeleteAt());
+                else
+                    assertEquals(deletionTime, 
marker.closeDeletionTime(false).markedForDeleteAt());
+
+                // check clustering values
+                Clustering clustering = Util.clustering(CFM_SLICES.comparator, 
ck1, count / 2);
+                for (int i = 0; i < CFM_SLICES.comparator.size(); i++)
+                {
+                    int cmp = CFM_SLICES.comparator.compareComponent(i,
+                                                                     
clustering.values[i],
+                                                                     
marker.clustering().values[i]);
+                    assertEquals(0, cmp);
+                }
+                open = !open;
+            }
+            else
+            {
+                // deleted row
+                assertTrue(unfiltered.isRow());
+                Row row = (Row) unfiltered;
+                assertEquals(deletionTime, 
row.deletion().time().markedForDeleteAt());
+                assertEquals(0, row.size()); // no btree
+            }
+            count++;
+        }
+        if (isTombstone)
+            assertEquals(uniqueCk2 * 2, count); // open and close range 
tombstones
+        else
+            assertEquals(uniqueCk2, count);
+    }
+
     private void checkForS(UnfilteredPartitionIterator pi)
     {
         Assert.assertTrue(pi.toString(), pi.hasNext());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to