Fix sstable reader to support range-tombstone-marker for multi-slices patch by Zhao Yang; reviewed by Sylvain Lebresne for CASSANDRA-13787
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/975c3d81 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/975c3d81 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/975c3d81 Branch: refs/heads/cassandra-3.11 Commit: 975c3d81b67e9c1e1dcefdda3f90e8edf6be5efa Parents: 35e32f2 Author: Zhao Yang <zhaoyangsingap...@gmail.com> Authored: Wed Aug 23 16:15:25 2017 +0800 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Wed Sep 20 15:09:49 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../columniterator/AbstractSSTableIterator.java | 7 - .../db/columniterator/SSTableIterator.java | 8 +- .../columniterator/SSTableReversedIterator.java | 2 +- .../org/apache/cassandra/cql3/ViewTest.java | 49 +++++++ .../db/SinglePartitionSliceCommandTest.java | 145 ++++++++++++++++++- 6 files changed, 197 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/975c3d81/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 74e70e1..2d11a3e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.15 + * Fix sstable reader to support range-tombstone-marker for multi-slices (CASSANDRA-13787) * Fix short read protection for tables with no clustering columns (CASSANDRA-13880) * Make isBuilt volatile in PartitionUpdate (CASSANDRA-13619) * Prevent integer overflow of timestamps in CellTest and RowsTest (CASSANDRA-13866) http://git-wip-us.apache.org/repos/asf/cassandra/blob/975c3d81/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java index c61b6aa..f9e6545 100644 --- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java @@ -329,13 +329,6 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator openMarker = marker.isOpen(false) ? marker.openDeletionTime(false) : null; } - protected DeletionTime getAndClearOpenMarker() - { - DeletionTime toReturn = openMarker; - openMarker = null; - return toReturn; - } - public boolean hasNext() { try http://git-wip-us.apache.org/repos/asf/cassandra/blob/975c3d81/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java index ff91871..47f85ac 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java @@ -173,14 +173,14 @@ public class SSTableIterator extends AbstractSSTableIterator if (next != null) return true; - // If we have an open marker, we should close it before finishing + // for current slice, no data read from deserialization + sliceDone = true; + // If we have an open marker, we should not close it, there could be more slices if (openMarker != null) { - next = new RangeTombstoneBoundMarker(end, getAndClearOpenMarker()); + next = new RangeTombstoneBoundMarker(end, openMarker); return true; } - - sliceDone = true; // not absolutely necessary but accurate and cheap return false; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/975c3d81/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java index b12ed67..76d8c4d 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java @@ -240,7 +240,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator // skipFirstIteratedItem (this is the last item of the block, but we're iterating in reverse order so it will // be the first returned by the iterator). RangeTombstone.Bound markerEnd = end == null ? RangeTombstone.Bound.TOP : RangeTombstone.Bound.fromSliceBound(end); - buffer.add(new RangeTombstoneBoundMarker(markerEnd, getAndClearOpenMarker())); + buffer.add(new RangeTombstoneBoundMarker(markerEnd, openMarker)); if (hasPreviousBlock) skipFirstIteratedItem = true; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/975c3d81/test/unit/org/apache/cassandra/cql3/ViewTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java index 61f4b4a..84b2773 100644 --- a/test/unit/org/apache/cassandra/cql3/ViewTest.java +++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java @@ -47,6 +47,7 @@ import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.utils.FBUtilities; + public class ViewTest extends CQLTester { int protocolVersion = 4; @@ -99,6 +100,54 @@ public class ViewTest extends CQLTester } @Test + public void testExistingRangeTombstoneWithFlush() throws Throwable + { + testExistingRangeTombstone(true); + } + + @Test + public void testExistingRangeTombstoneWithoutFlush() throws Throwable + { + testExistingRangeTombstone(false); + } + + public void testExistingRangeTombstone(boolean flush) throws Throwable + { + createTable("CREATE TABLE %s (k1 int, c1 int, c2 int, v1 int, v2 int, PRIMARY KEY (k1, c1, c2))"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + + createView("view1", + "CREATE MATERIALIZED VIEW view1 AS SELECT * FROM %%s WHERE k1 IS NOT NULL AND c1 IS NOT NULL AND c2 IS NOT NULL PRIMARY KEY (k1, c2, c1)"); + + updateView("DELETE FROM %s USING TIMESTAMP 10 WHERE k1 = 1 and c1=1"); + + if (flush) + Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()).forceBlockingFlush(); + + String table = KEYSPACE + "." + currentTable(); + updateView("BEGIN BATCH " + + "INSERT INTO " + table + " (k1, c1, c2, v1, v2) VALUES (1, 0, 0, 0, 0) USING TIMESTAMP 5; " + + "INSERT INTO " + table + " (k1, c1, c2, v1, v2) VALUES (1, 0, 1, 0, 1) USING TIMESTAMP 5; " + + "INSERT INTO " + table + " (k1, c1, c2, v1, v2) VALUES (1, 1, 0, 1, 0) USING TIMESTAMP 5; " + + "INSERT INTO " + table + " (k1, c1, c2, v1, v2) VALUES (1, 1, 1, 1, 1) USING TIMESTAMP 5; " + + "INSERT INTO " + table + " (k1, c1, c2, v1, v2) VALUES (1, 1, 2, 1, 2) USING TIMESTAMP 5; " + + "INSERT INTO " + table + " (k1, c1, c2, v1, v2) VALUES (1, 1, 3, 1, 3) USING TIMESTAMP 5; " + + "INSERT INTO " + table + " (k1, c1, c2, v1, v2) VALUES (1, 2, 0, 2, 0) USING TIMESTAMP 5; " + + "APPLY BATCH"); + + assertRowsIgnoringOrder(execute("select * from %s"), + row(1, 0, 0, 0, 0), + row(1, 0, 1, 0, 1), + row(1, 2, 0, 2, 0)); + assertRowsIgnoringOrder(execute("select k1,c1,c2,v1,v2 from view1"), + row(1, 0, 0, 0, 0), + row(1, 0, 1, 0, 1), + row(1, 2, 0, 2, 0)); + } + + @Test public void testPartitionTombstone() throws Throwable { createTable("CREATE TABLE %s (k1 int, c1 int , val int, PRIMARY KEY (k1, c1))"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/975c3d81/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java index 02b642e..b1a374f 100644 --- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java +++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java @@ -20,11 +20,12 @@ */ package org.apache.cassandra.db; +import static org.junit.Assert.*; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Iterator; - import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -32,23 +33,28 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.filter.AbstractClusteringIndexFilter; +import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter; import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.IntegerType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.util.DataInputBuffer; @@ -58,6 +64,7 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.btree.BTreeSet; public class SinglePartitionSliceCommandTest { @@ -70,6 +77,9 @@ public class SinglePartitionSliceCommandTest private static ColumnDefinition v; private static ColumnDefinition s; + private static final String TABLE_SCLICES = "tbl_slices"; + private static CFMetaData CFM_SLICES; + @BeforeClass public static void defineSchema() throws ConfigurationException { @@ -80,17 +90,28 @@ public class SinglePartitionSliceCommandTest .addRegularColumn("v", UTF8Type.instance) .build(); + CFM_SLICES = CFMetaData.Builder.create(KEYSPACE, TABLE_SCLICES) + .addPartitionKey("k", UTF8Type.instance) + .addClusteringColumn("c1", Int32Type.instance) + .addClusteringColumn("c2", Int32Type.instance) + .addRegularColumn("v", IntegerType.instance) + .build(); + SchemaLoader.prepareServer(); - SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), cfm); + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), cfm, CFM_SLICES); + cfm = Schema.instance.getCFMetaData(KEYSPACE, TABLE); v = cfm.getColumnDefinition(new ColumnIdentifier("v", true)); s = cfm.getColumnDefinition(new ColumnIdentifier("s", true)); + + CFM_SLICES = Schema.instance.getCFMetaData(KEYSPACE, TABLE_SCLICES); } @Before public void truncate() { Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE).truncateBlocking(); + Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE_SCLICES).truncateBlocking(); } @Test @@ -143,6 +164,124 @@ public class SinglePartitionSliceCommandTest Assert.assertEquals(Collections.emptyList(), rowIter.cells); } + @Test + public void testMultiNamesCommandWithFlush() + { + testMultiNamesOrSlicesCommand(true, false); + } + + @Test + public void testMultiNamesCommandWithoutFlush() + { + testMultiNamesOrSlicesCommand(false, false); + } + + @Test + public void testMultiSlicesCommandWithFlush() + { + testMultiNamesOrSlicesCommand(true, true); + } + + @Test + public void testMultiSlicesCommandWithoutFlush() + { + testMultiNamesOrSlicesCommand(false, true); + } + + private AbstractClusteringIndexFilter createClusteringFilter(int uniqueCk1, int uniqueCk2, boolean isSlice) + { + Slices.Builder slicesBuilder = new Slices.Builder(CFM_SLICES.comparator); + BTreeSet.Builder<Clustering> namesBuilder = BTreeSet.builder(CFM_SLICES.comparator); + + for (int ck1 = 0; ck1 < uniqueCk1; ck1++) + { + for (int ck2 = 0; ck2 < uniqueCk2; ck2++) + { + if (isSlice) + slicesBuilder.add(Slice.make(Util.clustering(CFM_SLICES.comparator, ck1, ck2))); + else + namesBuilder.add(Util.clustering(CFM_SLICES.comparator, ck1, ck2)); + } + } + if (isSlice) + return new ClusteringIndexSliceFilter(slicesBuilder.build(), false); + return new ClusteringIndexNamesFilter(namesBuilder.build(), false); + } + + private void testMultiNamesOrSlicesCommand(boolean flush, boolean isSlice) + { + boolean isTombstone = flush || isSlice; + int deletionTime = 5; + int ck1 = 1; + int uniqueCk1 = 2; + int uniqueCk2 = 3; + + DecoratedKey key = CFM_SLICES.decorateKey(ByteBufferUtil.bytes("k")); + QueryProcessor.executeInternal(String.format("DELETE FROM ks.tbl_slices USING TIMESTAMP %d WHERE k='k' AND c1=%d", + deletionTime, + ck1)); + + if (flush) + Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE_SCLICES).forceBlockingFlush(); + + AbstractClusteringIndexFilter clusteringFilter = createClusteringFilter(uniqueCk1, uniqueCk2, isSlice); + ReadCommand cmd = SinglePartitionReadCommand.create(CFM_SLICES, + FBUtilities.nowInSeconds(), + ColumnFilter.all(CFM_SLICES), + RowFilter.NONE, + DataLimits.NONE, + key, + clusteringFilter); + + UnfilteredPartitionIterator partitionIterator = cmd.executeLocally(ReadOrderGroup.emptyGroup()); + assert partitionIterator.hasNext(); + UnfilteredRowIterator partition = partitionIterator.next(); + + int count = 0; + boolean open = true; + while (partition.hasNext()) + { + Unfiltered unfiltered = partition.next(); + if (isTombstone) + { + assertTrue(unfiltered.isRangeTombstoneMarker()); + RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered; + + // check if it's open-close pair + assertTrue(marker.isOpen(false) == open); + // check deletion time same as Range Deletion + if (open) + assertEquals(deletionTime, marker.openDeletionTime(false).markedForDeleteAt()); + else + assertEquals(deletionTime, marker.closeDeletionTime(false).markedForDeleteAt()); + + // check clustering values + Clustering clustering = Util.clustering(CFM_SLICES.comparator, ck1, count / 2); + for (int i = 0; i < CFM_SLICES.comparator.size(); i++) + { + int cmp = CFM_SLICES.comparator.compareComponent(i, + clustering.values[i], + marker.clustering().values[i]); + assertEquals(0, cmp); + } + open = !open; + } + else + { + // deleted row + assertTrue(unfiltered.isRow()); + Row row = (Row) unfiltered; + assertEquals(deletionTime, row.deletion().time().markedForDeleteAt()); + assertEquals(0, row.size()); // no btree + } + count++; + } + if (isTombstone) + assertEquals(uniqueCk2 * 2, count); // open and close range tombstones + else + assertEquals(uniqueCk2, count); + } + private void checkForS(UnfilteredPartitionIterator pi) { Assert.assertTrue(pi.toString(), pi.hasNext()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org