Merge commit '66115139addfb2bb6e26fa85e4225a1178d2e99c' into trunk * commit '66115139addfb2bb6e26fa85e4225a1178d2e99c': Fix sstable reader to support range-tombstone-marker for multi-slices
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a624748 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a624748 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a624748 Branch: refs/heads/trunk Commit: 9a62474822149bca358d7535e6dd7210ca17277e Parents: eb76692 6611513 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Wed Sep 20 15:16:53 2017 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Wed Sep 20 15:17:32 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../columniterator/AbstractSSTableIterator.java | 7 - .../db/columniterator/SSTableIterator.java | 8 +- .../columniterator/SSTableReversedIterator.java | 2 +- .../org/apache/cassandra/cql3/ViewTest.java | 49 +++++++ .../db/SinglePartitionSliceCommandTest.java | 143 ++++++++++++++++++- 6 files changed, 195 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a624748/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a624748/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a624748/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a624748/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a624748/test/unit/org/apache/cassandra/cql3/ViewTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a624748/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java index f79066b,7ad6198..d03d3bc --- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java +++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java @@@ -20,9 -20,12 +20,10 @@@ */ package org.apache.cassandra.db; + import static org.junit.Assert.*; + import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collections; import java.util.Iterator; - import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@@ -30,14 -33,17 +31,17 @@@ import org.junit.Test import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.Util; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.cql3.ColumnIdentifier; ++import org.apache.cassandra.Util; import org.apache.cassandra.cql3.QueryProcessor; + import org.apache.cassandra.cql3.UntypedResultSet; + import org.apache.cassandra.db.filter.AbstractClusteringIndexFilter; + import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter; import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.filter.DataLimits; @@@ -64,33 -74,221 +72,162 @@@ public class SinglePartitionSliceComman private static final String KEYSPACE = "ks"; private static final String TABLE = "tbl"; - private static CFMetaData cfm; - private static ColumnDefinition v; - private static ColumnDefinition s; + private static TableMetadata metadata; + private static ColumnMetadata v; + private static ColumnMetadata s; + private static final String TABLE_SCLICES = "tbl_slices"; - private static CFMetaData CFM_SLICES; ++ private static TableMetadata CFM_SLICES; + @BeforeClass public static void defineSchema() throws ConfigurationException { DatabaseDescriptor.daemonInitialization(); - cfm = CFMetaData.Builder.create(KEYSPACE, TABLE) - .addPartitionKey("k", UTF8Type.instance) - .addStaticColumn("s", UTF8Type.instance) - .addClusteringColumn("i", IntegerType.instance) - .addRegularColumn("v", UTF8Type.instance) - .build(); - - CFM_SLICES = CFMetaData.Builder.create(KEYSPACE, TABLE_SCLICES) - .addPartitionKey("k", UTF8Type.instance) - .addClusteringColumn("c1", Int32Type.instance) - .addClusteringColumn("c2", Int32Type.instance) - .addRegularColumn("v", IntegerType.instance) - .build(); + metadata = + TableMetadata.builder(KEYSPACE, TABLE) + .addPartitionKeyColumn("k", UTF8Type.instance) + .addStaticColumn("s", UTF8Type.instance) + .addClusteringColumn("i", IntegerType.instance) + .addRegularColumn("v", UTF8Type.instance) + .build(); + ++ CFM_SLICES = TableMetadata.builder(KEYSPACE, TABLE_SCLICES) ++ .addPartitionKeyColumn("k", UTF8Type.instance) ++ .addClusteringColumn("c1", Int32Type.instance) ++ .addClusteringColumn("c2", Int32Type.instance) ++ .addRegularColumn("v", IntegerType.instance) ++ .build(); + SchemaLoader.prepareServer(); - SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), metadata); - SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), cfm, CFM_SLICES); - - cfm = Schema.instance.getCFMetaData(KEYSPACE, TABLE); - v = cfm.getColumnDefinition(new ColumnIdentifier("v", true)); - s = cfm.getColumnDefinition(new ColumnIdentifier("s", true)); - - CFM_SLICES = Schema.instance.getCFMetaData(KEYSPACE, TABLE_SCLICES); ++ SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), metadata, CFM_SLICES); + v = metadata.getColumn(new ColumnIdentifier("v", true)); + s = metadata.getColumn(new ColumnIdentifier("s", true)); } @Before public void truncate() { Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE).truncateBlocking(); + Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE_SCLICES).truncateBlocking(); + } + + @Test - public void staticColumnsAreFiltered() throws IOException - { - DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k")); - - UntypedResultSet rows; - - QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, s, i, v) VALUES ('k', 's', 0, 'v')"); - QueryProcessor.executeInternal("DELETE v FROM ks.tbl WHERE k='k' AND i=0"); - QueryProcessor.executeInternal("DELETE FROM ks.tbl WHERE k='k' AND i=0"); - rows = QueryProcessor.executeInternal("SELECT * FROM ks.tbl WHERE k='k' AND i=0"); - - for (UntypedResultSet.Row row: rows) - { - logger.debug("Current: k={}, s={}, v={}", (row.has("k") ? row.getString("k") : null), (row.has("s") ? row.getString("s") : null), (row.has("v") ? row.getString("v") : null)); - } - - assert rows.isEmpty(); - - ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(v)); - ByteBuffer zero = ByteBufferUtil.bytes(0); - Slices slices = Slices.with(cfm.comparator, Slice.make(ClusteringBound.inclusiveStartOf(zero), ClusteringBound.inclusiveEndOf(zero))); - ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(slices, false); - ReadCommand cmd = SinglePartitionReadCommand.create(true, - cfm, - FBUtilities.nowInSeconds(), - columnFilter, - RowFilter.NONE, - DataLimits.NONE, - key, - sliceFilter); - - DataOutputBuffer out = new DataOutputBuffer((int) ReadCommand.legacyReadCommandSerializer.serializedSize(cmd, MessagingService.VERSION_21)); - ReadCommand.legacyReadCommandSerializer.serialize(cmd, out, MessagingService.VERSION_21); - DataInputPlus in = new DataInputBuffer(out.buffer(), true); - cmd = ReadCommand.legacyReadCommandSerializer.deserialize(in, MessagingService.VERSION_21); - - logger.debug("ReadCommand: {}", cmd); - try (ReadExecutionController controller = cmd.executionController(); - UnfilteredPartitionIterator partitionIterator = cmd.executeLocally(controller)) - { - ReadResponse response = ReadResponse.createDataResponse(partitionIterator, cmd); - - logger.debug("creating response: {}", response); - try (UnfilteredPartitionIterator pIter = response.makeIterator(cmd)) - { - assert pIter.hasNext(); - try (UnfilteredRowIterator partition = pIter.next()) - { - LegacyLayout.LegacyUnfilteredPartition rowIter = LegacyLayout.fromUnfilteredRowIterator(cmd, partition); - Assert.assertEquals(Collections.emptyList(), rowIter.cells); - } - } - } - } - - @Test + public void testMultiNamesCommandWithFlush() + { + testMultiNamesOrSlicesCommand(true, false); + } + + @Test + public void testMultiNamesCommandWithoutFlush() + { + testMultiNamesOrSlicesCommand(false, false); + } + + @Test + public void testMultiSlicesCommandWithFlush() + { + testMultiNamesOrSlicesCommand(true, true); + } + + @Test + public void testMultiSlicesCommandWithoutFlush() + { + testMultiNamesOrSlicesCommand(false, true); + } + + private AbstractClusteringIndexFilter createClusteringFilter(int uniqueCk1, int uniqueCk2, boolean isSlice) + { + Slices.Builder slicesBuilder = new Slices.Builder(CFM_SLICES.comparator); + BTreeSet.Builder<Clustering> namesBuilder = BTreeSet.builder(CFM_SLICES.comparator); + + for (int ck1 = 0; ck1 < uniqueCk1; ck1++) + { + for (int ck2 = 0; ck2 < uniqueCk2; ck2++) + { + if (isSlice) + slicesBuilder.add(Slice.make(Util.clustering(CFM_SLICES.comparator, ck1, ck2))); + else + namesBuilder.add(Util.clustering(CFM_SLICES.comparator, ck1, ck2)); + } + } + if (isSlice) + return new ClusteringIndexSliceFilter(slicesBuilder.build(), false); + return new ClusteringIndexNamesFilter(namesBuilder.build(), false); + } + + private void testMultiNamesOrSlicesCommand(boolean flush, boolean isSlice) + { + boolean isTombstone = flush || isSlice; + int deletionTime = 5; + int ck1 = 1; + int uniqueCk1 = 2; + int uniqueCk2 = 3; + - DecoratedKey key = CFM_SLICES.decorateKey(ByteBufferUtil.bytes("k")); ++ DecoratedKey key = Util.dk(ByteBufferUtil.bytes("k")); + QueryProcessor.executeInternal(String.format("DELETE FROM ks.tbl_slices USING TIMESTAMP %d WHERE k='k' AND c1=%d", + deletionTime, + ck1)); + + if (flush) + Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE_SCLICES).forceBlockingFlush(); + + AbstractClusteringIndexFilter clusteringFilter = createClusteringFilter(uniqueCk1, uniqueCk2, isSlice); + ReadCommand cmd = SinglePartitionReadCommand.create(CFM_SLICES, + FBUtilities.nowInSeconds(), + ColumnFilter.all(CFM_SLICES), + RowFilter.NONE, + DataLimits.NONE, + key, + clusteringFilter); + + UnfilteredPartitionIterator partitionIterator = cmd.executeLocally(cmd.executionController()); + assert partitionIterator.hasNext(); + UnfilteredRowIterator partition = partitionIterator.next(); + + int count = 0; + boolean open = true; + while (partition.hasNext()) + { + Unfiltered unfiltered = partition.next(); + if (isTombstone) + { + assertTrue(unfiltered.isRangeTombstoneMarker()); + RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered; + + // check if it's open-close pair + assertTrue(marker.isOpen(false) == open); + // check deletion time same as Range Deletion + if (open) + assertEquals(deletionTime, marker.openDeletionTime(false).markedForDeleteAt()); + else + assertEquals(deletionTime, marker.closeDeletionTime(false).markedForDeleteAt()); + + // check clustering values + Clustering clustering = Util.clustering(CFM_SLICES.comparator, ck1, count / 2); + for (int i = 0; i < CFM_SLICES.comparator.size(); i++) + { + int cmp = CFM_SLICES.comparator.compareComponent(i, + clustering.getRawValues()[i], + marker.clustering().values[i]); + assertEquals(0, cmp); + } + open = !open; + } + else + { + // deleted row + assertTrue(unfiltered.isRow()); + Row row = (Row) unfiltered; + assertEquals(deletionTime, row.deletion().time().markedForDeleteAt()); + assertEquals(0, row.size()); // no btree + } + count++; + } + if (isTombstone) + assertEquals(uniqueCk2 * 2, count); // open and close range tombstones + else + assertEquals(uniqueCk2, count); } private void checkForS(UnfilteredPartitionIterator pi) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org