This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 9a280516ca8b9e730ae0648e5e29ee6280605132 Author: Sam Tunnicliffe <s...@beobal.com> AuthorDate: Wed Dec 18 18:31:36 2019 +0000 Exclude purgeable tombstones from repaired data digest Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-15462 --- CHANGES.txt | 1 + .../cassandra/db/PartitionRangeReadCommand.java | 2 +- src/java/org/apache/cassandra/db/ReadCommand.java | 160 ++++++++++-- .../cassandra/db/SinglePartitionReadCommand.java | 2 +- .../cassandra/db/partitions/PurgeFunction.java | 7 +- .../distributed/test/RepairDigestTrackingTest.java | 54 ++++ .../org/apache/cassandra/db/ReadCommandTest.java | 272 ++++++++++++++++++++- 7 files changed, 473 insertions(+), 25 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 3d5217a..43126ed 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-alpha3 + * Exclude purgeable tombstones from repaired data tracking (CASSANDRA-15462) * Exclude legacy counter shards from repaired data tracking (CASSANDRA-15461) * Make it easier to add trace headers to messages (CASSANDRA-15499) * Fix and optimise partial compressed sstable streaming (CASSANDRA-13938) diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 2145389..cb68950 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -282,7 +282,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR if (inputCollector.isEmpty()) return EmptyIterators.unfilteredPartition(metadata()); - return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(inputCollector.finalizeIterators()), cfs); + return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(inputCollector.finalizeIterators(cfs, nowInSec(), oldestUnrepairedTombstone)), cfs); } catch (RuntimeException | Error e) { diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 9485abc..4f8ea3e 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -90,7 +90,7 @@ public abstract class ReadCommand extends AbstractReadQuery // for data queries, coordinators may request information on the repaired data used in constructing the response private boolean trackRepairedStatus = false; - // tracker for repaired data, initialized to singelton null object + // tracker for repaired data, initialized to singleton null object private static final RepairedDataInfo NULL_REPAIRED_DATA_INFO = new RepairedDataInfo() { void trackPartitionKey(DecoratedKey key){} @@ -724,7 +724,7 @@ public abstract class ReadCommand extends AbstractReadQuery } private static UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator, - final RepairedDataInfo repairedDataInfo) + final RepairedDataInfo repairedDataInfo) { class WithRepairedDataTracking extends Transformation<UnfilteredRowIterator> { @@ -744,6 +744,7 @@ public abstract class ReadCommand extends AbstractReadQuery { protected DecoratedKey applyToPartitionKey(DecoratedKey key) { + repairedDataInfo.onNewPartition(iterator); repairedDataInfo.trackPartitionKey(key); return key; } @@ -762,7 +763,7 @@ public abstract class ReadCommand extends AbstractReadQuery protected Row applyToStatic(Row row) { - repairedDataInfo.trackRow(row); + repairedDataInfo.trackStaticRow(row); return row; } @@ -771,21 +772,48 @@ public abstract class ReadCommand extends AbstractReadQuery repairedDataInfo.trackRow(row); return row; } - } + protected void onPartitionClose() + { + repairedDataInfo.onPartitionClose(); + } + } return Transformation.apply(iterator, new WithTracking()); } private static class RepairedDataInfo { - private Digest hasher; + // Keeps a digest of the partition currently being processed. Since we won't know + // whether a partition will be fully purged from a read result until it's been + // consumed, we buffer this per-partition digest and add it to the final digest + // when the partition is closed (if it wasn't fully purged). + private Digest perPartitionDigest; + private Digest perCommandDigest; private boolean isConclusive = true; + // Doesn't actually purge from the underlying iterators, but excludes from the digest + // the purger can't be initialized until we've iterated all the sstables for the query + // as it requires the oldest repaired tombstone + private RepairedDataPurger purger; + private boolean isFullyPurged = true; + ByteBuffer getDigest() { - return hasher == null + return perCommandDigest == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER - : ByteBuffer.wrap(getHasher().digest()); + : ByteBuffer.wrap(perCommandDigest.digest()); + } + + protected void onNewPartition(UnfilteredRowIterator partition) + { + assert purger != null; + purger.setCurrentKey(partition.partitionKey()); + purger.setIsReverseOrder(partition.isReverseOrder()); + } + + protected void setPurger(RepairedDataPurger purger) + { + this.purger = purger; } boolean isConclusive() @@ -800,30 +828,128 @@ public abstract class ReadCommand extends AbstractReadQuery void trackPartitionKey(DecoratedKey key) { - getHasher().update(key.getKey()); + getPerPartitionDigest().update(key.getKey()); } void trackDeletion(DeletionTime deletion) { - deletion.digest(getHasher()); + assert purger != null; + DeletionTime purged = purger.applyToDeletion(deletion); + if (!purged.isLive()) + isFullyPurged = false; + + purged.digest(getPerPartitionDigest()); } void trackRangeTombstoneMarker(RangeTombstoneMarker marker) { - marker.digest(getHasher()); + assert purger != null; + RangeTombstoneMarker purged = purger.applyToMarker(marker); + if (purged != null) + { + isFullyPurged = false; + purged.digest(getPerPartitionDigest()); + } + } + + void trackStaticRow(Row row) + { + assert purger != null; + Row purged = purger.applyToRow(row); + if (!purged.isEmpty()) + { + isFullyPurged = false; + purged.digest(getPerPartitionDigest()); + } } void trackRow(Row row) { - row.digest(getHasher()); + assert purger != null; + Row purged = purger.applyToRow(row); + if (purged != null) + { + isFullyPurged = false; + purged.digest(getPerPartitionDigest()); + } + } + + private Digest getPerPartitionDigest() + { + if (perPartitionDigest == null) + perPartitionDigest = Digest.forRepairedDataTracking(); + + return perPartitionDigest; + } + + private void onPartitionClose() + { + if (perPartitionDigest != null) + { + // If the partition wasn't completely emptied by the purger, + // calculate the digest for the partition and use it to + // update the overall digest + if (!isFullyPurged) + { + if (perCommandDigest == null) + perCommandDigest = Digest.forRepairedDataTracking(); + + byte[] partitionDigest = perPartitionDigest.digest(); + perCommandDigest.update(partitionDigest, 0, partitionDigest.length); + isFullyPurged = true; + } + + perPartitionDigest = null; + } + } + } + + /** + * Although PurgeFunction extends Transformation, this is never applied to an iterator. + * Instead, it is used by RepairedDataInfo during the generation of a repaired data + * digest to exclude data which will actually be purged later on in the read pipeline. + */ + private static class RepairedDataPurger extends PurgeFunction + { + RepairedDataPurger(ColumnFamilyStore cfs, + int nowInSec, + int oldestUnrepairedTombstone) + { + super(nowInSec, + cfs.gcBefore(nowInSec), + oldestUnrepairedTombstone, + cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(), + cfs.metadata.get().enforceStrictLiveness()); } - private Digest getHasher() + protected LongPredicate getPurgeEvaluator() { - if (hasher == null) - hasher = Digest.forRepairedDataTracking(); + return (time) -> true; + } + + void setCurrentKey(DecoratedKey key) + { + super.onNewPartition(key); + } + + void setIsReverseOrder(boolean isReverseOrder) + { + super.setReverseOrder(isReverseOrder); + } - return hasher; + public DeletionTime applyToDeletion(DeletionTime deletionTime) + { + return super.applyToDeletion(deletionTime); + } + + public Row applyToRow(Row row) + { + return super.applyToRow(row); + } + + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + return super.applyToMarker(marker); } } @@ -912,12 +1038,14 @@ public abstract class ReadCommand extends AbstractReadQuery unrepairedIters.add(iter); } - List<T> finalizeIterators() + List<T> finalizeIterators(ColumnFamilyStore cfs, int nowInSec, int oldestUnrepairedTombstone) { if (repairedIters.isEmpty()) return unrepairedIters; // merge the repaired data before returning, wrapping in a digest generator + RepairedDataPurger purger = new RepairedDataPurger(cfs, nowInSec, oldestUnrepairedTombstone); + repairedDataInfo.setPurger(purger); unrepairedIters.add(repairedMerger.apply(repairedIters, repairedDataInfo)); return unrepairedIters; } diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index eb57b93..4daad7d 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -695,7 +695,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar StorageHook.instance.reportRead(cfs.metadata().id, partitionKey()); - return withSSTablesIterated(inputCollector.finalizeIterators(), cfs.metric, metricsCollector); + return withSSTablesIterated(inputCollector.finalizeIterators(cfs, nowInSec(), oldestUnrepairedTombstone), cfs.metric, metricsCollector); } catch (RuntimeException | Error e) { diff --git a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java index 8dcd359..d9e9036 100644 --- a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java +++ b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java @@ -60,13 +60,18 @@ public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator { } + protected void setReverseOrder(boolean isReverseOrder) + { + this.isReverseOrder = isReverseOrder; + } + @Override @SuppressWarnings("resource") protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) { onNewPartition(partition.partitionKey()); - isReverseOrder = partition.isReverseOrder(); + setReverseOrder(partition.isReverseOrder()); UnfilteredRowIterator purged = Transformation.apply(partition, this); if (purged.isEmpty()) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java index a987ea3..1af329f 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java @@ -23,6 +23,7 @@ import java.io.Serializable; import java.util.EnumSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; @@ -113,6 +114,59 @@ public class RepairDigestTrackingTest extends DistributedTestBase implements Ser } } + @Test + public void testPurgeableTombstonesAreIgnored() throws Throwable + { + try (Cluster cluster = init(Cluster.create(2))) + { + + cluster.get(1).runOnInstance(() -> { + StorageProxy.instance.enableRepairedDataTrackingForRangeReads(); + }); + + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl2 (k INT, c INT, v1 INT, v2 INT, PRIMARY KEY (k,c)) WITH gc_grace_seconds=0"); + // on node1 only insert some tombstones, then flush + for (int i = 0; i < 10; i++) + { + cluster.get(1).executeInternal("DELETE v1 FROM " + KEYSPACE + ".tbl2 USING TIMESTAMP 0 WHERE k=? and c=? ", i, i); + } + cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush()); + + // insert data on both nodes and flush + for (int i = 0; i < 10; i++) + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 1", + ConsistencyLevel.ALL, + i, i, i); + } + cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush()); + cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush()); + + // nothing is repaired yet + cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired)); + cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired)); + // mark everything repaired + cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired)); + cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired)); + cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired)); + cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired)); + + // now overwrite on node2 only to generate digest mismatches, but don't flush so the repaired dataset is not affected + for (int i = 0; i < 10; i++) + { + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 2", i, i, i * 2); + } + + long ccBefore = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount()); + // Unfortunately we need to sleep here to ensure that nowInSec > the local deletion time of the tombstones + TimeUnit.SECONDS.sleep(2); + cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl2", ConsistencyLevel.ALL); + long ccAfter = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount()); + + Assert.assertEquals("No repaired data inconsistencies should be detected", ccBefore, ccAfter); + } + } + private void assertNotRepaired(SSTableReader reader) { Assert.assertTrue("repaired at is set for sstable: " + reader.descriptor, getRepairedAt(reader) == ActiveRepairService.UNREPAIRED_SSTABLE); } diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java index c04f489..4419c70 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java @@ -24,6 +24,7 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.*; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.junit.Assert; @@ -40,10 +41,12 @@ import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.CounterColumnType; +import org.apache.cassandra.db.marshal.SetType; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.dht.Range; @@ -61,8 +64,11 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.Verb; import org.apache.cassandra.repair.consistent.LocalSessionAccessor; import org.apache.cassandra.schema.CachingParams; +import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableParams; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tracing.Tracing; @@ -70,6 +76,7 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; +import static org.apache.cassandra.utils.ByteBufferUtil.EMPTY_BYTE_BUFFER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -85,6 +92,7 @@ public class ReadCommandTest private static final String CF5 = "Standard5"; private static final String CF6 = "Standard6"; private static final String CF7 = "Counter7"; + private static final String CF8 = "Standard8"; private static final InetAddressAndPort REPAIR_COORDINATOR; static { @@ -161,6 +169,14 @@ public class ReadCommandTest .addClusteringColumn("col", AsciiType.instance) .addRegularColumn("c", CounterColumnType.instance); + TableMetadata.Builder metadata8 = + TableMetadata.builder(KEYSPACE, CF8) + .addPartitionKeyColumn("key", BytesType.instance) + .addClusteringColumn("col", AsciiType.instance) + .addRegularColumn("a", AsciiType.instance) + .addRegularColumn("b", AsciiType.instance) + .addRegularColumn("c", SetType.getInstance(AsciiType.instance, true)); + SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), @@ -170,7 +186,8 @@ public class ReadCommandTest metadata4, metadata5, metadata6, - metadata7); + metadata7, + metadata8); LocalSessionAccessor.startup(); } @@ -683,7 +700,7 @@ public class ReadCommandTest // execute a read and capture the digest ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build(); ByteBuffer digestWithLegacyCounter0 = performReadAndVerifyRepairedInfo(readCommand, 1, 1, true); - assertFalse(ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(digestWithLegacyCounter0)); + assertFalse(EMPTY_BYTE_BUFFER.equals(digestWithLegacyCounter0)); // truncate, then re-insert the same partition, but this time with a legacy // shard having the value 1. The repaired digest should match the previous, as @@ -713,11 +730,254 @@ public class ReadCommandTest cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null)); ByteBuffer digestWithCounterCell = performReadAndVerifyRepairedInfo(readCommand, 1, 1, true); - assertFalse(ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(digestWithCounterCell)); + assertFalse(EMPTY_BYTE_BUFFER.equals(digestWithCounterCell)); assertFalse(digestWithLegacyCounter0.equals(digestWithCounterCell)); assertFalse(digestWithLegacyCounter1.equals(digestWithCounterCell)); } + /** + * Writes a single partition containing a single row and reads using a partition read. The single + * row includes 1 live simple column, 1 simple tombstone and 1 complex column with a complex + * deletion and a live cell. The repaired data digests generated by executing the same query + * before and after the tombstones become eligible for purging should not match each other. + * Also, neither digest should be empty as the partition is not made empty by the purging. + */ + @Test + public void purgeGCableTombstonesBeforeCalculatingDigest() throws Exception + { + KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(KEYSPACE); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF8); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + setGCGrace(cfs, 600); + + DecoratedKey[] keys = new DecoratedKey[] { Util.dk("key0"), Util.dk("key1"), Util.dk("key2"), Util.dk("key3") }; + int nowInSec = FBUtilities.nowInSeconds(); + TableMetadata cfm = cfs.metadata(); + + // A simple tombstone + new RowUpdateBuilder(cfs.metadata(), 0, keys[0]).clustering("cc").delete("a").build().apply(); + + // Collection with an associated complex deletion + PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(cfs.metadata(), keys[1]).timestamp(0); + builder.row("cc").add("c", ImmutableSet.of("element1", "element2")); + builder.buildAsMutation().apply(); + + // RangeTombstone and a row (not covered by the RT). The row contains a regular tombstone which will not be + // purged. This is to prevent the partition from being fully purged and removed from the final results + new RowUpdateBuilder(cfs.metadata(), nowInSec, 0L, keys[2]).addRangeTombstone("aa", "bb").build().apply(); + new RowUpdateBuilder(cfs.metadata(), nowInSec+ 1000, 1000L, keys[2]).clustering("cc").delete("a").build().apply(); + + // Partition with 2 rows, one fully deleted + new RowUpdateBuilder(cfs.metadata.get(), 0, keys[3]).clustering("bb").add("a", ByteBufferUtil.bytes("a")).delete("b").build().apply(); + RowUpdateBuilder.deleteRow(cfs.metadata(), 0, keys[3], "cc").apply(); + cfs.forceBlockingFlush(); + cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null)); + + Map<DecoratedKey, ByteBuffer> digestsWithTombstones = new HashMap<>(); + //Tombstones are not yet purgable + for (DecoratedKey key : keys) + { + ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec).build(); + cmd.trackRepairedStatus(); + Partition partition = Util.getOnlyPartitionUnfiltered(cmd); + assertFalse(partition.isEmpty()); + partition.unfilteredIterator().forEachRemaining(u -> { + // must be either a RT, or a row containing some kind of deletion + assertTrue(u.isRangeTombstoneMarker() || ((Row)u).hasDeletion(cmd.nowInSec())); + }); + ByteBuffer digestWithTombstones = cmd.getRepairedDataDigest(); + // None should generate an empty digest + assertDigestsDiffer(EMPTY_BYTE_BUFFER, digestWithTombstones); + digestsWithTombstones.put(key, digestWithTombstones); + } + + // Make tombstones eligible for purging and re-run cmd with an incremented nowInSec + setGCGrace(cfs, 0); + + //Tombstones are now purgable, so won't be in the read results and produce different digests + for (DecoratedKey key : keys) + { + ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec + 1).build(); + cmd.trackRepairedStatus(); + Partition partition = Util.getOnlyPartitionUnfiltered(cmd); + assertFalse(partition.isEmpty()); + partition.unfilteredIterator().forEachRemaining(u -> { + // After purging, only rows without any deletions should remain. + // The one exception is "key2:cc" which has a regular column tombstone which is not + // eligible for purging. This is to prevent the partition from being fully purged + // when its RT is removed. + assertTrue(u.isRow()); + Row r = (Row)u; + assertTrue(!r.hasDeletion(cmd.nowInSec()) + || (key.equals(keys[2]) && r.clustering() + .get(0) + .equals(AsciiType.instance.fromString("cc")))); + + }); + ByteBuffer digestWithoutTombstones = cmd.getRepairedDataDigest(); + // not an empty digest + assertDigestsDiffer(EMPTY_BYTE_BUFFER, digestWithoutTombstones); + // should not match the pre-purge digest + assertDigestsDiffer(digestsWithTombstones.get(key), digestWithoutTombstones); + } + } + + private void setGCGrace(ColumnFamilyStore cfs, int gcGrace) + { + TableParams newParams = cfs.metadata().params.unbuild().gcGraceSeconds(gcGrace).build(); + KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(cfs.metadata().keyspace); + Schema.instance.load( + keyspaceMetadata.withSwapped( + keyspaceMetadata.tables.withSwapped( + cfs.metadata().withSwapped(newParams)))); + } + + private void assertDigestsDiffer(ByteBuffer b0, ByteBuffer b1) + { + assertTrue(ByteBufferUtil.compareUnsigned(b0, b1) != 0); + } + + @Test + public void partitionReadFullyPurged() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6); + ReadCommand partitionRead = Util.cmd(cfs, Util.dk("key")).build(); + fullyPurgedPartitionCreatesEmptyDigest(cfs, partitionRead); + } + + @Test + public void rangeReadFullyPurged() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6); + ReadCommand rangeRead = Util.cmd(cfs).build(); + fullyPurgedPartitionCreatesEmptyDigest(cfs, rangeRead); + } + + /** + * Writes a single partition containing only a single row deletion and reads with either a range or + * partition query. Before the row deletion is eligible for purging, it should appear in the query + * results and cause a non-empty repaired data digest to be generated. Repeating the query after + * the row deletion is eligible for purging, both the result set and the repaired data digest should + * be empty. + */ + private void fullyPurgedPartitionCreatesEmptyDigest(ColumnFamilyStore cfs, ReadCommand command) throws Exception + { + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + setGCGrace(cfs, 600); + + // Partition with a single, fully deleted row + RowUpdateBuilder.deleteRow(cfs.metadata(), 0, ByteBufferUtil.bytes("key"), "cc").apply(); + cfs.forceBlockingFlush(); + cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null)); + + command.trackRepairedStatus(); + List<ImmutableBTreePartition> partitions = Util.getAllUnfiltered(command); + assertEquals(1, partitions.size()); + ByteBuffer digestWithTombstones = command.getRepairedDataDigest(); + assertTrue(ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithTombstones) != 0); + + // Make tombstones eligible for purging and re-run cmd with an incremented nowInSec + setGCGrace(cfs, 0); + + AbstractReadCommandBuilder builder = command instanceof PartitionRangeReadCommand + ? Util.cmd(cfs) + : Util.cmd(cfs, Util.dk("key")); + builder.withNowInSeconds(command.nowInSec() + 60); + command = builder.build(); + command.trackRepairedStatus(); + + partitions = Util.getAllUnfiltered(command); + assertTrue(partitions.isEmpty()); + ByteBuffer digestWithoutTombstones = command.getRepairedDataDigest(); + assertEquals(0, ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithoutTombstones)); + } + + /** + * Verifies that during range reads which include multiple partitions, fully purged partitions + * have no material effect on the calculated digest. This test writes two sstables, each containing + * a single partition; the first is live and the second fully deleted and eligible for purging. + * Initially, only the sstable containing the live partition is marked repaired, while a range read + * which covers both partitions is performed to generate a digest. Then the sstable containing the + * purged partition is also marked repaired and the query reexecuted. The digests produced by both + * queries should match as the digest calculation should exclude the fully purged partition. + */ + @Test + public void mixedPurgedAndNonPurgedPartitions() + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + setGCGrace(cfs, 0); + + ReadCommand command = Util.cmd(cfs).withNowInSeconds(FBUtilities.nowInSeconds() + 60).build(); + + // Live partition in a repaired sstable, so included in the digest calculation + new RowUpdateBuilder(cfs.metadata.get(), 0, ByteBufferUtil.bytes("key-0")).clustering("cc").add("a", ByteBufferUtil.bytes("a")).build().apply(); + cfs.forceBlockingFlush(); + cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null)); + // Fully deleted partition in an unrepaired sstable, so not included in the intial digest + RowUpdateBuilder.deleteRow(cfs.metadata(), 0, ByteBufferUtil.bytes("key-1"), "cc").apply(); + cfs.forceBlockingFlush(); + + command.trackRepairedStatus(); + List<ImmutableBTreePartition> partitions = Util.getAllUnfiltered(command); + assertEquals(1, partitions.size()); + ByteBuffer digestWithoutPurgedPartition = command.getRepairedDataDigest(); + assertTrue(ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithoutPurgedPartition) != 0); + + // mark the sstable containing the purged partition as repaired, so both partitions are now + // read during in the digest calculation. Because the purged partition is entirely + // discarded, the resultant digest should match the earlier one. + cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null)); + command = Util.cmd(cfs).withNowInSeconds(command.nowInSec()).build(); + command.trackRepairedStatus(); + + partitions = Util.getAllUnfiltered(command); + assertEquals(1, partitions.size()); + ByteBuffer digestWithPurgedPartition = command.getRepairedDataDigest(); + assertEquals(0, ByteBufferUtil.compareUnsigned(digestWithPurgedPartition, digestWithoutPurgedPartition)); + } + + @Test + public void purgingConsidersRepairedDataOnly() throws Exception + { + // 2 sstables, first is repaired and contains data that is all purgeable + // the second is unrepaired and contains non-purgable data. Even though + // the partition itself is not fully purged, the repaired data digest + // should be empty as there was no non-purgeable, repaired data read. + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + setGCGrace(cfs, 0); + + // Partition with a single, fully deleted row which will be fully purged + DecoratedKey key = Util.dk("key"); + RowUpdateBuilder.deleteRow(cfs.metadata(), 0, key, "cc").apply(); + cfs.forceBlockingFlush(); + cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null)); + + new RowUpdateBuilder(cfs.metadata(), 1, key).clustering("cc").add("a", ByteBufferUtil.bytes("a")).build().apply(); + cfs.forceBlockingFlush(); + + int nowInSec = FBUtilities.nowInSeconds() + 10; + ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec).build(); + cmd.trackRepairedStatus(); + Partition partition = Util.getOnlyPartitionUnfiltered(cmd); + assertFalse(partition.isEmpty()); + // check that + try (UnfilteredRowIterator rows = partition.unfilteredIterator()) + { + assertFalse(rows.isEmpty()); + Unfiltered unfiltered = rows.next(); + assertFalse(rows.hasNext()); + assertTrue(unfiltered.isRow()); + assertFalse(((Row) unfiltered).hasDeletion(nowInSec)); + } + assertEquals(EMPTY_BYTE_BUFFER, cmd.getRepairedDataDigest()); + } + private long readCount(SSTableReader sstable) { return sstable.getReadMeter().count(); @@ -833,7 +1093,7 @@ public class ReadCommandTest Set<ByteBuffer> digests = new HashSet<>(); // first time round, nothing has been marked repaired so we expect digest to be an empty buffer and to be marked conclusive ByteBuffer digest = performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, true); - assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest); + assertEquals(EMPTY_BYTE_BUFFER, digest); digests.add(digest); // add a pending repair session to table1, digest should remain the same but now we expect it to be marked inconclusive @@ -872,12 +1132,12 @@ public class ReadCommandTest .delete() .build()).apply(); digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false); - assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest); + assertEquals(EMPTY_BYTE_BUFFER, digest); // now flush so we have an unrepaired table with the deletion and repeat the check cfs.forceBlockingFlush(); digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false); - assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest); + assertEquals(EMPTY_BYTE_BUFFER, digest); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org