This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push: new 092915a Don't skip sstables with partition deletes 092915a is described below commit 092915ad19d25c1ba93f7968210b88fb6e4b9180 Author: Sam Tunnicliffe <s...@beobal.com> AuthorDate: Tue Apr 7 16:35:50 2020 +0100 Don't skip sstables with partition deletes Patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for CASSANDRA-15690 --- CHANGES.txt | 1 + .../cassandra/db/SinglePartitionReadCommand.java | 103 +++++++++------------ .../distributed/test/SimpleReadWriteTest.java | 101 ++++++++++++++++++++ 3 files changed, 148 insertions(+), 57 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 5af0ef3..91b8241 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.21 + * Don't skip sstables in slice queries based only on local min/max/deletion timestamp (CASSANDRA-15690) * Memtable memory allocations may deadlock (CASSANDRA-15367) * Run evictFromMembership in GossipStage (CASSANDRA-15592) Merged from 2.2: diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 15b74d8..2e014ba 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -705,92 +705,81 @@ public class SinglePartitionReadCommand extends ReadCommand * We can't eliminate full sstables based on the timestamp of what we've already read like * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone * we've read. We still rely on the sstable ordering by maxTimestamp since if - * maxTimestamp_s1 > maxTimestamp_s0, + * maxTimestamp_s1 < maxTimestamp_s0, * we're guaranteed that s1 cannot have a row tombstone such that * timestamp(tombstone) > maxTimestamp_s0 * since we necessarily have * timestamp(tombstone) <= maxTimestamp_s1 - * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination - * in one pass, and minimize the number of sstables for which we read a partition tombstone. + * In other words, iterating in descending maxTimestamp order allow to do our mostRecentPartitionTombstone + * elimination in one pass, and minimize the number of sstables for which we read a partition tombstone. */ Collections.sort(view.sstables, SSTableReader.maxTimestampComparator); - List<SSTableReader> skippedSSTables = null; long mostRecentPartitionTombstone = Long.MIN_VALUE; - long minTimestamp = Long.MAX_VALUE; int nonIntersectingSSTables = 0; + int includedDueToTombstones = 0; SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector(); for (SSTableReader sstable : view.sstables) { - minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp()); // if we've already seen a partition tombstone with a timestamp greater // than the most recent update to this sstable, we can skip it if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone) break; - if (!shouldInclude(sstable)) - { - nonIntersectingSSTables++; - // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely - if (sstable.hasTombstones()) - { - if (skippedSSTables == null) - skippedSSTables = new ArrayList<>(); - skippedSSTables.add(sstable); - } - continue; - } - - if (!sstable.isRepaired()) - oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); - - // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator - @SuppressWarnings("resource") - UnfilteredRowIterator iter = filter.filter( - sstable.iterator(partitionKey(), - columnFilter(), - filter.isReversed(), - isForThrift(), - metricsCollector) - ); - - if (isForThrift()) - iter = ThriftResultsMerger.maybeWrap(iter, nowInSec()); - - iterators.add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false)); - - mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt()); - } - - int includedDueToTombstones = 0; - // Check for partition tombstones in the skipped sstables - if (skippedSSTables != null) - { - for (SSTableReader sstable : skippedSSTables) + if (shouldInclude(sstable)) { - if (sstable.getMaxTimestamp() <= minTimestamp) - continue; + if (!sstable.isRepaired()) + oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); - @SuppressWarnings("resource") // 'iter' is either closed right away, or added to iterators which is close on exception, or through the closing of the final merged iterator + // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator + @SuppressWarnings("resource") UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift(), metricsCollector)); - if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp) - { - iterators.add(iter); - if (!sstable.isRepaired()) - oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); - includedDueToTombstones++; - } - else + if (isForThrift()) + iter = ThriftResultsMerger.maybeWrap(iter, nowInSec()); + + iterators.add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false)); + mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, + iter.partitionLevelDeletion().markedForDeleteAt()); + } + else + { + + nonIntersectingSSTables++; + // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely + if (sstable.hasTombstones()) { - iter.close(); + // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator + @SuppressWarnings("resource") + UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), + columnFilter(), + filter.isReversed(), + isForThrift(), + metricsCollector)); + // if the sstable contains a partition delete, then we must include it regardless of whether it + // shadows any other data seen locally as we can't guarantee that other replicas have seen it + if (!iter.partitionLevelDeletion().isLive()) + { + includedDueToTombstones++; + iterators.add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false)); + if (!sstable.isRepaired()) + oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); + mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, + iter.partitionLevelDeletion().markedForDeleteAt()); + } + else + { + iter.close(); + } + } } } + if (Tracing.isTracing()) Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones", nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones); diff --git a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java index f1f8674..75e5ba9 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java @@ -1,12 +1,16 @@ package org.apache.cassandra.distributed.test; +import java.util.Set; + import org.junit.Assert; import org.junit.Test; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.io.sstable.format.SSTableReader; import static org.junit.Assert.assertEquals; @@ -269,6 +273,103 @@ public class SimpleReadWriteTest extends SharedClusterTestBase assertEquals(100, readCount1); } + + @Test + public void skippedSSTableWithPartitionDeletionTest() throws Throwable + { + try (Cluster cluster = init(Cluster.create(2))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))"); + // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp + cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0"); + // and a row from a different partition, to provide the sstable's min/max clustering + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 2"); + cluster.get(1).flush(KEYSPACE); + // expect a single sstable, where minTimestamp equals the timestamp of the partition delete + cluster.get(1).runOnInstance(() -> { + Set<SSTableReader> sstables = Keyspace.open(KEYSPACE) + .getColumnFamilyStore("tbl") + .getLiveSSTables(); + assertEquals(1, sstables.size()); + assertEquals(1, sstables.iterator().next().getMinTimestamp()); + }); + + // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0"); + + + Object[][] rows = cluster.coordinator(1) + .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5", + ConsistencyLevel.ALL); + assertEquals(0, rows.length); + } + } + + @Test + public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode() throws Throwable + { + try (Cluster cluster = init(Cluster.create(2))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))"); + // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp + cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0"); + // and a row from a different partition, to provide the sstable's min/max clustering + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 1"); + cluster.get(1).flush(KEYSPACE); + // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we + // insert a row which is not shadowed by the partition delete and flush to a second sstable. Importantly, + // this sstable's minTimestamp is > than the maxTimestamp of the first sstable. This would cause the first + // sstable not to be reincluded in the merge input, but we can't really make that decision as we don't + // know what data and/or tombstones are present on other nodes + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2"); + cluster.get(1).flush(KEYSPACE); + + // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0"); + + Object[][] rows = cluster.coordinator(1) + .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5", + ConsistencyLevel.ALL); + // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from + // node 1 (0, 6, 6) was not. + assertRows(rows, new Object[] {0, 6 ,6}); + } + } + + @Test + public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode2() throws Throwable + { + // don't not add skipped sstables back just because the partition delete ts is < the local min ts + + try (Cluster cluster = init(Cluster.create(2))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))"); + // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp + cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0"); + // and a row from a different partition, to provide the sstable's min/max clustering + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 3"); + cluster.get(1).flush(KEYSPACE); + // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we + // insert a row which is not shadowed by the partition delete and flush to a second sstable. The first sstable + // has a maxTimestamp > than the min timestamp of all sstables, so it is a candidate for reinclusion to the + // merge. Hoever, the second sstable's minTimestamp is > than the partition delete. This would cause the + // first sstable not to be reincluded in the merge input, but we can't really make that decision as we don't + // know what data and/or tombstones are present on other nodes + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2"); + cluster.get(1).flush(KEYSPACE); + + // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0"); + + Object[][] rows = cluster.coordinator(1) + .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5", + ConsistencyLevel.ALL); + // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from + // node 1 (0, 6, 6) was not. + assertRows(rows, new Object[] {0, 6 ,6}); + } + } + private long readCount(IInvokableInstance instance) { return instance.callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org