Repository: cassandra Updated Branches: refs/heads/trunk 744973e4c -> 5fbb938ad
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java new file mode 100644 index 0000000..816fe9f --- /dev/null +++ b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.repair; + +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.PartitionRangeReadCommand; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.NoSpamLogger; + +public interface RepairedDataVerifier +{ + public void verify(RepairedDataTracker tracker); + + static RepairedDataVerifier simple(ReadCommand command) + { + return new SimpleVerifier(command); + } + + static class SimpleVerifier implements RepairedDataVerifier + { + private static final Logger logger = LoggerFactory.getLogger(SimpleVerifier.class); + private final ReadCommand command; + + private static final String INCONSISTENCY_WARNING = "Detected mismatch between repaired datasets for table {}.{} during read of {}. {}"; + + SimpleVerifier(ReadCommand command) + { + this.command = command; + } + + @Override + public void verify(RepairedDataTracker tracker) + { + Tracing.trace("Verifying repaired data tracker {}", tracker); + + // some mismatch occurred between the repaired datasets on the replicas + if (tracker.digests.keySet().size() > 1) + { + // if any of the digests should be considered inconclusive, because there were + // pending repair sessions which had not yet been committed or unrepaired partition + // deletes which meant some sstables were skipped during reads, mark the inconsistency + // as confirmed + if (tracker.inconclusiveDigests.isEmpty()) + { + TableMetrics metrics = ColumnFamilyStore.metricsFor(command.metadata().id); + metrics.confirmedRepairedInconsistencies.mark(); + NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, + INCONSISTENCY_WARNING, command.metadata().keyspace, + command.metadata().name, getCommandString(), tracker); + } + else if (DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches()) + { + TableMetrics metrics = ColumnFamilyStore.metricsFor(command.metadata().id); + metrics.unconfirmedRepairedInconsistencies.mark(); + NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, + INCONSISTENCY_WARNING, command.metadata().keyspace, + command.metadata().name, getCommandString(), tracker); + } + } + } + + private String getCommandString() + { + return command instanceof SinglePartitionReadCommand + ? ((SinglePartitionReadCommand)command).partitionKey().toString() + : ((PartitionRangeReadCommand)command).dataRange().keyRange().getString(command.metadata().partitionKeyType); + + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/db/ReadCommandTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java index 5020b95..8df7651 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java @@ -20,10 +20,12 @@ package org.apache.cassandra.db; import java.io.IOException; import java.io.OutputStream; +import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; +import java.util.*; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -37,28 +39,36 @@ import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.db.partitions.FilteredPartition; -import org.apache.cassandra.db.partitions.PartitionIterator; -import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; -import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.rows.SerializationHelper; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.repair.consistent.LocalSessionAccessor; +import org.apache.cassandra.schema.CachingParams; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class ReadCommandTest { @@ -68,6 +78,20 @@ public class ReadCommandTest private static final String CF3 = "Standard3"; private static final String CF4 = "Standard4"; private static final String CF5 = "Standard5"; + private static final String CF6 = "Standard6"; + + private static final InetAddressAndPort REPAIR_COORDINATOR; + static { + try + { + REPAIR_COORDINATOR = InetAddressAndPort.getByName("10.0.0.1"); + } + catch (UnknownHostException e) + { + + throw new AssertionError(e); + } + } @BeforeClass public static void defineSchema() throws ConfigurationException @@ -116,6 +140,14 @@ public class ReadCommandTest .addRegularColumn("e", AsciiType.instance) .addRegularColumn("f", AsciiType.instance); + TableMetadata.Builder metadata6 = + TableMetadata.builder(KEYSPACE, CF6) + .addPartitionKeyColumn("key", BytesType.instance) + .addClusteringColumn("col", AsciiType.instance) + .addRegularColumn("a", AsciiType.instance) + .addRegularColumn("b", AsciiType.instance) + .caching(CachingParams.CACHE_EVERYTHING); + SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), @@ -123,7 +155,10 @@ public class ReadCommandTest metadata2, metadata3, metadata4, - metadata5); + metadata5, + metadata6); + + LocalSessionAccessor.startup(); } @Test @@ -540,4 +575,242 @@ public class ReadCommandTest assertEquals(1, cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax()); } + @Test + public void testSinglePartitionSliceRepairedDataTracking() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2); + ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build(); + testRepairedDataTracking(cfs, readCommand); + } + + @Test + public void testPartitionRangeRepairedDataTracking() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2); + ReadCommand readCommand = Util.cmd(cfs).build(); + testRepairedDataTracking(cfs, readCommand); + } + + @Test + public void testSinglePartitionNamesRepairedDataTracking() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2); + ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("cc").includeRow("dd").build(); + testRepairedDataTracking(cfs, readCommand); + } + + @Test + public void testSinglePartitionNamesSkipsOptimisationsIfTrackingRepairedData() + { + // when tracking, the optimizations of querying sstables in timestamp order and + // returning once all requested columns are not available so just assert that + // all sstables are read when performing such queries + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key")) + .clustering("dd") + .add("a", ByteBufferUtil.bytes("abcd")) + .build() + .apply(); + + cfs.forceBlockingFlush(); + + new RowUpdateBuilder(cfs.metadata(), 1, ByteBufferUtil.bytes("key")) + .clustering("dd") + .add("a", ByteBufferUtil.bytes("wxyz")) + .build() + .apply(); + + cfs.forceBlockingFlush(); + List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); + assertEquals(2, sstables.size()); + Collections.sort(sstables, SSTableReader.maxTimestampComparator); + + ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("dd").columns("a").build(); + + assertEquals(0, readCount(sstables.get(0))); + assertEquals(0, readCount(sstables.get(1))); + ReadCommand withTracking = readCommand.copy(); + withTracking.trackRepairedStatus(); + Util.getAll(withTracking); + assertEquals(1, readCount(sstables.get(0))); + assertEquals(1, readCount(sstables.get(1))); + + // same command without tracking touches only the table with the higher timestamp + Util.getAll(readCommand.copy()); + assertEquals(2, readCount(sstables.get(0))); + assertEquals(1, readCount(sstables.get(1))); + } + + private long readCount(SSTableReader sstable) + { + return sstable.getReadMeter().count(); + } + + @Test + public void skipRowCacheIfTrackingRepairedData() + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6); + + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key")) + .clustering("cc") + .add("a", ByteBufferUtil.bytes("abcd")) + .build() + .apply(); + + cfs.forceBlockingFlush(); + + ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build(); + assertTrue(cfs.isRowCacheEnabled()); + // warm the cache + assertFalse(Util.getAll(readCommand).isEmpty()); + long cacheHits = cfs.metric.rowCacheHit.getCount(); + + Util.getAll(readCommand); + assertTrue(cfs.metric.rowCacheHit.getCount() > cacheHits); + cacheHits = cfs.metric.rowCacheHit.getCount(); + + ReadCommand withRepairedInfo = readCommand.copy(); + withRepairedInfo.trackRepairedStatus(); + Util.getAll(withRepairedInfo); + assertEquals(cacheHits, cfs.metric.rowCacheHit.getCount()); + } + + private void testRepairedDataTracking(ColumnFamilyStore cfs, ReadCommand readCommand) throws IOException + { + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key")) + .clustering("cc") + .add("a", ByteBufferUtil.bytes("abcd")) + .build() + .apply(); + + cfs.forceBlockingFlush(); + + new RowUpdateBuilder(cfs.metadata(), 1, ByteBufferUtil.bytes("key")) + .clustering("dd") + .add("a", ByteBufferUtil.bytes("abcd")) + .build() + .apply(); + + cfs.forceBlockingFlush(); + List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); + assertEquals(2, sstables.size()); + sstables.forEach(sstable -> assertFalse(sstable.isRepaired() || sstable.isPendingRepair())); + SSTableReader sstable1 = sstables.get(0); + SSTableReader sstable2 = sstables.get(1); + + int numPartitions = 1; + int rowsPerPartition = 2; + + // Capture all the digest versions as we mutate the table's repaired status. Each time + // we make a change, we expect a different digest. + Set<ByteBuffer> digests = new HashSet<>(); + // first time round, nothing has been marked repaired so we expect digest to be an empty buffer and to be marked conclusive + ByteBuffer digest = performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, true); + assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest); + digests.add(digest); + + // add a pending repair session to table1, digest should remain the same but now we expect it to be marked inconclusive + UUID session1 = UUIDGen.getTimeUUID(); + mutateRepaired(cfs, sstable1, ActiveRepairService.UNREPAIRED_SSTABLE, session1); + digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, false)); + assertEquals(1, digests.size()); + + // add a different pending session to table2, digest should remain the same and still consider it inconclusive + UUID session2 = UUIDGen.getTimeUUID(); + mutateRepaired(cfs, sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, session2); + digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, false)); + assertEquals(1, digests.size()); + + // mark one table repaired + mutateRepaired(cfs, sstable1, 111, null); + // this time, digest should not be empty, session2 still means that the result is inconclusive + digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, false)); + assertEquals(2, digests.size()); + + // mark the second table repaired + mutateRepaired(cfs, sstable2, 222, null); + // digest should be updated again and as there are no longer any pending sessions, it should be considered conclusive + digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, true)); + assertEquals(3, digests.size()); + + // insert a partition tombstone into the memtable, then re-check the repaired info. + // This is to ensure that when the optimisations which skip reading from sstables + // when a newer partition tombstone has already been cause the digest to be marked + // as inconclusive. + // the exception to this case is for partition range reads, where we always read + // and generate digests for all sstables, so we only test this path for single partition reads + if (readCommand.isLimitedToOnePartition()) + { + new Mutation(PartitionUpdate.simpleBuilder(cfs.metadata(), ByteBufferUtil.bytes("key")) + .delete() + .build()).apply(); + digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false); + assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest); + + // now flush so we have an unrepaired table with the deletion and repeat the check + cfs.forceBlockingFlush(); + digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false); + assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest); + } + } + + private void mutateRepaired(ColumnFamilyStore cfs, SSTableReader sstable, long repairedAt, UUID pendingSession) throws IOException + { + sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingSession, false); + sstable.reloadSSTableMetadata(); + if (pendingSession != null) + { + // setup a minimal repair session. This is necessary because we + // check for sessions which have exceeded timeout and been purged + Range<Token> range = new Range<>(cfs.metadata().partitioner.getMinimumToken(), + cfs.metadata().partitioner.getRandomToken()); + ActiveRepairService.instance.registerParentRepairSession(pendingSession, + REPAIR_COORDINATOR, + Lists.newArrayList(cfs), + Sets.newHashSet(range), + true, + repairedAt, + true, + PreviewKind.NONE); + + LocalSessionAccessor.prepareUnsafe(pendingSession, null, Sets.newHashSet(REPAIR_COORDINATOR)); + } + } + + private ByteBuffer performReadAndVerifyRepairedInfo(ReadCommand command, + int expectedPartitions, + int expectedRowsPerPartition, + boolean expectConclusive) + { + // perform equivalent read command multiple times and assert that + // the repaired data info is always consistent. Return the digest + // so we can verify that it changes when the repaired status of + // the queried tables does. + Set<ByteBuffer> digests = new HashSet<>(); + for (int i = 0; i < 10; i++) + { + ReadCommand withRepairedInfo = command.copy(); + withRepairedInfo.trackRepairedStatus(); + + List<FilteredPartition> partitions = Util.getAll(withRepairedInfo); + assertEquals(expectedPartitions, partitions.size()); + partitions.forEach(p -> assertEquals(expectedRowsPerPartition, p.rowCount())); + + ByteBuffer digest = withRepairedInfo.getRepairedDataDigest(); + digests.add(digest); + assertEquals(1, digests.size()); + assertEquals(expectConclusive, withRepairedInfo.isRepairedDataDigestConclusive()); + } + return digests.iterator().next(); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java new file mode 100644 index 0000000..0c43661 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.net.UnknownHostException; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +import com.google.common.collect.ImmutableMap; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.IMessageSink; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.ParameterType; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ReadCommandVerbHandlerTest +{ + private static final String TEST_NAME = "read_command_vh_test_"; + private static final String KEYSPACE = TEST_NAME + "cql_keyspace"; + private static final String TABLE = "table1"; + + private final Random random = new Random(); + private ReadCommandVerbHandler handler; + private TableMetadata metadata; + + @BeforeClass + public static void init() + { + SchemaLoader.loadSchema(); + SchemaLoader.schemaDefinition(TEST_NAME); + } + + @Before + public void setup() + { + MessagingService.instance().clearMessageSinks(); + MessagingService.instance().addMessageSink(new IMessageSink() + { + public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to) + { + return false; + } + + public boolean allowIncomingMessage(MessageIn message, int id) + { + return false; + } + }); + + metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE); + handler = new ReadCommandVerbHandler(); + } + + @Test + public void setRepairedDataTrackingFlagIfHeaderPresent() + { + ReadCommand command = command(key()); + assertFalse(command.isTrackingRepairedStatus()); + Map<ParameterType, Object> params = ImmutableMap.of(ParameterType.TRACK_REPAIRED_DATA, + MessagingService.ONE_BYTE); + handler.doVerb(MessageIn.create(peer(), + command, + params, + MessagingService.Verb.READ, + MessagingService.current_version), + messageId()); + assertTrue(command.isTrackingRepairedStatus()); + } + + @Test + public void dontSetRepairedDataTrackingFlagUnlessHeaderPresent() + { + ReadCommand command = command(key()); + assertFalse(command.isTrackingRepairedStatus()); + Map<ParameterType, Object> params = ImmutableMap.of(ParameterType.TRACE_SESSION, + UUID.randomUUID()); + handler.doVerb(MessageIn.create(peer(), + command, + params, + MessagingService.Verb.READ, + MessagingService.current_version), + messageId()); + assertFalse(command.isTrackingRepairedStatus()); + } + + @Test + public void dontSetRepairedDataTrackingFlagIfHeadersEmpty() + { + ReadCommand command = command(key()); + assertFalse(command.isTrackingRepairedStatus()); + handler.doVerb(MessageIn.create(peer(), + command, + ImmutableMap.of(), + MessagingService.Verb.READ, + MessagingService.current_version), + messageId()); + assertFalse(command.isTrackingRepairedStatus()); + } + + private int key() + { + return random.nextInt(); + } + + private int messageId() + { + return random.nextInt(); + } + + private InetAddressAndPort peer() + { + try + { + return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, 9}); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + private ReadCommand command(int key) + { + return new SinglePartitionReadCommand(false, + 0, + false, + metadata, + FBUtilities.nowInSeconds(), + ColumnFilter.all(metadata), + RowFilter.NONE, + DataLimits.NONE, + metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)), + new ClusteringIndexSliceFilter(Slices.ALL, false), + null); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/db/ReadResponseTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ReadResponseTest.java b/test/unit/org/apache/cassandra/db/ReadResponseTest.java new file mode 100644 index 0000000..6e1a804 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/ReadResponseTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ReadResponseTest +{ + + private final Random random = new Random(); + private TableMetadata metadata; + + @Before + public void setup() + { + metadata = TableMetadata.builder("ks", "t1") + .addPartitionKeyColumn("p", Int32Type.instance) + .addRegularColumn("v", Int32Type.instance) + .partitioner(Murmur3Partitioner.instance) + .build(); + } + + @Test + public void fromCommandWithConclusiveRepairedDigest() + { + ByteBuffer digest = digest(); + ReadCommand command = command(key(), metadata, digest, true); + ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata)); + assertTrue(response.isRepairedDigestConclusive()); + assertEquals(digest, response.repairedDataDigest()); + verifySerDe(response); + } + + @Test + public void fromCommandWithInconclusiveRepairedDigest() + { + ByteBuffer digest = digest(); + ReadCommand command = command(key(), metadata, digest, false); + ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata)); + assertFalse(response.isRepairedDigestConclusive()); + assertEquals(digest, response.repairedDataDigest()); + verifySerDe(response); + } + + @Test + public void fromCommandWithConclusiveEmptyRepairedDigest() + { + ReadCommand command = command(key(), metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata)); + assertTrue(response.isRepairedDigestConclusive()); + assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, response.repairedDataDigest()); + verifySerDe(response); + } + + @Test + public void fromCommandWithInconclusiveEmptyRepairedDigest() + { + ReadCommand command = command(key(), metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, false); + ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata)); + assertFalse(response.isRepairedDigestConclusive()); + assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, response.repairedDataDigest()); + verifySerDe(response); + } + + /* + * Digest responses should never include repaired data tracking as we only request + * it in read repair or for range queries + */ + @Test (expected = UnsupportedOperationException.class) + public void digestResponseErrorsIfRepairedDataDigestRequested() + { + ReadCommand command = digestCommand(key(), metadata); + ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata)); + assertTrue(response.isDigestResponse()); + assertFalse(response.mayIncludeRepairedDigest()); + response.repairedDataDigest(); + } + + @Test (expected = UnsupportedOperationException.class) + public void digestResponseErrorsIfIsConclusiveRequested() + { + ReadCommand command = digestCommand(key(), metadata); + ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata)); + assertTrue(response.isDigestResponse()); + assertFalse(response.mayIncludeRepairedDigest()); + response.isRepairedDigestConclusive(); + } + + @Test (expected = UnsupportedOperationException.class) + public void digestResponseErrorsIfIteratorRequested() + { + ReadCommand command = digestCommand(key(), metadata); + ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata)); + assertTrue(response.isDigestResponse()); + assertFalse(response.mayIncludeRepairedDigest()); + response.makeIterator(command); + } + + @Test + public void makeDigestDoesntConsiderRepairedDataInfo() + { + // It shouldn't be possible to get false positive DigestMismatchExceptions based + // on differing repaired data tracking info because it isn't requested on initial + // requests, only following a digest mismatch. Having a test doesn't hurt though + int key = key(); + ByteBuffer digest1 = digest(); + ReadCommand command1 = command(key, metadata, digest1, true); + ReadResponse response1 = command1.createResponse(EmptyIterators.unfilteredPartition(metadata)); + + ByteBuffer digest2 = digest(); + ReadCommand command2 = command(key, metadata, digest2, false); + ReadResponse response2 = command1.createResponse(EmptyIterators.unfilteredPartition(metadata)); + + assertEquals(response1.digest(command1), response2.digest(command2)); + } + + private void verifySerDe(ReadResponse response) { + // check that roundtripping through ReadResponse.serializer behaves as expected. + // ReadResponses from pre-4.0 nodes will never contain repaired data digest + // or pending session info, but we run all messages through both pre/post 4.0 + // serde to check that the defaults are correctly applied + roundTripSerialization(response, MessagingService.current_version); + roundTripSerialization(response, MessagingService.VERSION_30); + + } + + private void roundTripSerialization(ReadResponse response, int version) + { + try + { + DataOutputBuffer out = new DataOutputBuffer(); + ReadResponse.serializer.serialize(response, out, version); + + DataInputBuffer in = new DataInputBuffer(out.buffer(), false); + ReadResponse deser = ReadResponse.serializer.deserialize(in, version); + if (version < MessagingService.VERSION_40) + { + assertFalse(deser.mayIncludeRepairedDigest()); + // even though that means they should never be used, verify that the default values are present + assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, deser.repairedDataDigest()); + assertTrue(deser.isRepairedDigestConclusive()); + } + else + { + assertTrue(deser.mayIncludeRepairedDigest()); + assertEquals(response.repairedDataDigest(), deser.repairedDataDigest()); + assertEquals(response.isRepairedDigestConclusive(), deser.isRepairedDigestConclusive()); + } + } + catch (IOException e) + { + fail("Caught unexpected IOException during SerDe: " + e.getMessage()); + } + } + + + private int key() + { + return random.nextInt(); + } + + private ByteBuffer digest() + { + byte[] bytes = new byte[4]; + random.nextBytes(bytes); + return ByteBuffer.wrap(bytes); + } + + private ReadCommand digestCommand(int key, TableMetadata metadata) + { + return new StubReadCommand(key, metadata, true, ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + } + + private ReadCommand command(int key, TableMetadata metadata, ByteBuffer repairedDigest, boolean conclusive) + { + return new StubReadCommand(key, metadata, false, repairedDigest, conclusive); + } + + private static class StubReadCommand extends SinglePartitionReadCommand + { + + private final ByteBuffer repairedDigest; + private final boolean conclusive; + + StubReadCommand(int key, TableMetadata metadata, + boolean isDigest, + final ByteBuffer repairedDigest, + final boolean conclusive) + { + super(isDigest, + 0, + false, + metadata, + FBUtilities.nowInSeconds(), + ColumnFilter.all(metadata), + RowFilter.NONE, + DataLimits.NONE, + metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)), + null, + null); + this.repairedDigest = repairedDigest; + this.conclusive = conclusive; + } + + @Override + public ByteBuffer getRepairedDataDigest() + { + return repairedDigest; + } + + @Override + public boolean isRepairedDataDigestConclusive() + { + return conclusive; + } + + public UnfilteredPartitionIterator executeLocally(ReadExecutionController controller) + { + return EmptyIterators.unfilteredPartition(this.metadata()); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java index c6f2232..582aff8 100644 --- a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java +++ b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java @@ -19,6 +19,7 @@ package org.apache.cassandra.service.reads; import java.net.UnknownHostException; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.Iterator; import java.util.SortedSet; @@ -214,15 +215,38 @@ public abstract class AbstractReadResponseTest return dk(Integer.toString(k)); } - static MessageIn<ReadResponse> response(ReadCommand command, InetAddressAndPort from, UnfilteredPartitionIterator data, boolean digest) + + static MessageIn<ReadResponse> response(ReadCommand command, + InetAddressAndPort from, + UnfilteredPartitionIterator data, + boolean isDigestResponse, + int fromVersion, + ByteBuffer repairedDataDigest, + boolean hasPendingRepair) + { + ReadResponse response = isDigestResponse + ? ReadResponse.createDigestResponse(data, command) + : ReadResponse.createRemoteDataResponse(data, repairedDataDigest, hasPendingRepair, command, fromVersion); + return MessageIn.create(from, response, Collections.emptyMap(), MessagingService.Verb.READ, fromVersion); + } + + static MessageIn<ReadResponse> response(InetAddressAndPort from, + UnfilteredPartitionIterator partitionIterator, + ByteBuffer repairedDigest, + boolean hasPendingRepair, + ReadCommand cmd) + { + return response(cmd, from, partitionIterator, false, MessagingService.current_version, repairedDigest, hasPendingRepair); + } + + static MessageIn<ReadResponse> response(ReadCommand command, InetAddressAndPort from, UnfilteredPartitionIterator data, boolean isDigestResponse) { - ReadResponse response = digest ? ReadResponse.createDigestResponse(data, command) : ReadResponse.createDataResponse(data, command); - return MessageIn.create(from, response, Collections.emptyMap(), MessagingService.Verb.READ, MessagingService.current_version); + return response(command, from, data, false, MessagingService.current_version, ByteBufferUtil.EMPTY_BYTE_BUFFER, isDigestResponse); } static MessageIn<ReadResponse> response(ReadCommand command, InetAddressAndPort from, UnfilteredPartitionIterator data) { - return response(command, from, data, false); + return response(command, from, data, false, MessagingService.current_version, ByteBufferUtil.EMPTY_BYTE_BUFFER, false); } public RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java index abec25d..968ef16 100644 --- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java +++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java @@ -55,8 +55,13 @@ import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.locator.ReplicaUtils; +import org.apache.cassandra.net.*; +import org.apache.cassandra.service.reads.repair.ReadRepair; +import org.apache.cassandra.service.reads.repair.RepairedDataTracker; +import org.apache.cassandra.service.reads.repair.RepairedDataVerifier; import org.apache.cassandra.service.reads.repair.TestableReadRepair; import org.apache.cassandra.utils.ByteBufferUtil; @@ -81,6 +86,7 @@ public class DataResolverTest extends AbstractReadResponseTest public void setup() { command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build(); + command.trackRepairedStatus(); readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM); } @@ -586,6 +592,7 @@ public class DataResolverTest extends AbstractReadResponseTest public void testRepairRangeTombstoneWithPartitionDeletion() { EndpointsForRange replicas = makeReplicas(2); + DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); InetAddressAndPort peer1 = replicas.get(0).endpoint(); InetAddressAndPort peer2 = replicas.get(1).endpoint(); @@ -898,6 +905,347 @@ public class DataResolverTest extends AbstractReadResponseTest Assert.assertNull(readRepair.sent.get(peer2)); } + /** Tests for repaired data tracking */ + + @Test + public void trackMatchingEmptyDigestsWithAllConclusive() + { + EndpointsForRange replicas = makeReplicas(2); + ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER; + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + TestRepairedDataVerifier verifier = new TestRepairedDataVerifier(); + verifier.expectDigest(peer1, digest1, true); + verifier.expectDigest(peer2, digest1, true); + + DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier); + + resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command)); + resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command)); + + resolveAndConsume(resolver); + assertTrue(verifier.verified); + } + + @Test + public void trackMatchingEmptyDigestsWithSomeConclusive() + { + ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER; + EndpointsForRange replicas = makeReplicas(2); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + TestRepairedDataVerifier verifier = new TestRepairedDataVerifier(); + verifier.expectDigest(peer1, digest1, false); + verifier.expectDigest(peer2, digest1, true); + + DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier); + + resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command)); + resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command)); + + resolveAndConsume(resolver); + assertTrue(verifier.verified); + } + + @Test + public void trackMatchingEmptyDigestsWithNoneConclusive() + { + ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER; + EndpointsForRange replicas = makeReplicas(2); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + TestRepairedDataVerifier verifier = new TestRepairedDataVerifier(); + verifier.expectDigest(peer1, digest1, false); + verifier.expectDigest(peer2, digest1, false); + + DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier); + + resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command)); + resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command)); + + resolveAndConsume(resolver); + assertTrue(verifier.verified); + } + + @Test + public void trackMatchingDigestsWithAllConclusive() + { + ByteBuffer digest1 = ByteBufferUtil.bytes("digest1"); + EndpointsForRange replicas = makeReplicas(2); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + TestRepairedDataVerifier verifier = new TestRepairedDataVerifier(); + verifier.expectDigest(peer1, digest1, true); + verifier.expectDigest(peer2, digest1, true); + + DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier); + + resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command)); + resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command)); + + resolveAndConsume(resolver); + assertTrue(verifier.verified); + } + + @Test + public void trackMatchingDigestsWithSomeConclusive() + { + ByteBuffer digest1 = ByteBufferUtil.bytes("digest1"); + EndpointsForRange replicas = makeReplicas(2); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + TestRepairedDataVerifier verifier = new TestRepairedDataVerifier(); + verifier.expectDigest(peer1, digest1, true); + verifier.expectDigest(peer2, digest1, false); + + DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier); + + resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command)); + resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command)); + + resolveAndConsume(resolver); + assertTrue(verifier.verified); + } + + @Test + public void trackMatchingDigestsWithNoneConclusive() + { + EndpointsForRange replicas = makeReplicas(2); + ByteBuffer digest1 = ByteBufferUtil.bytes("digest1"); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + TestRepairedDataVerifier verifier = new TestRepairedDataVerifier(); + verifier.expectDigest(peer1, digest1, false); + verifier.expectDigest(peer2, digest1, false); + + DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier); + + resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command)); + resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command)); + + resolveAndConsume(resolver); + assertTrue(verifier.verified); + } + + @Test + public void trackMatchingRepairedDigestsWithDifferentData() + { + // As far as repaired data tracking is concerned, the actual data in the response is not relevant + EndpointsForRange replicas = makeReplicas(2); + ByteBuffer digest1 = ByteBufferUtil.bytes("digest1"); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + TestRepairedDataVerifier verifier = new TestRepairedDataVerifier(); + verifier.expectDigest(peer1, digest1, true); + verifier.expectDigest(peer2, digest1, true); + + DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier); + + resolver.preprocess(response(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") .buildUpdate()), digest1, true, command)); + resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command)); + + resolveAndConsume(resolver); + assertTrue(verifier.verified); + } + + @Test + public void trackMismatchingRepairedDigestsWithAllConclusive() + { + EndpointsForRange replicas = makeReplicas(2); + ByteBuffer digest1 = ByteBufferUtil.bytes("digest1"); + ByteBuffer digest2 = ByteBufferUtil.bytes("digest2"); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + TestRepairedDataVerifier verifier = new TestRepairedDataVerifier(); + verifier.expectDigest(peer1, digest1, true); + verifier.expectDigest(peer2, digest2, true); + + DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier); + + resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command)); + resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command)); + + resolveAndConsume(resolver); + assertTrue(verifier.verified); + } + + @Test + public void trackMismatchingRepairedDigestsWithSomeConclusive() + { + ByteBuffer digest1 = ByteBufferUtil.bytes("digest1"); + ByteBuffer digest2 = ByteBufferUtil.bytes("digest2"); + EndpointsForRange replicas = makeReplicas(2); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + TestRepairedDataVerifier verifier = new TestRepairedDataVerifier(); + verifier.expectDigest(peer1, digest1, false); + verifier.expectDigest(peer2, digest2, true); + + DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier); + + resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command)); + resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command)); + + resolveAndConsume(resolver); + assertTrue(verifier.verified); + } + + @Test + public void trackMismatchingRepairedDigestsWithNoneConclusive() + { + ByteBuffer digest1 = ByteBufferUtil.bytes("digest1"); + ByteBuffer digest2 = ByteBufferUtil.bytes("digest2"); + EndpointsForRange replicas = makeReplicas(2); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + TestRepairedDataVerifier verifier = new TestRepairedDataVerifier(); + verifier.expectDigest(peer1, digest1, false); + verifier.expectDigest(peer2, digest2, false); + + DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier); + + resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command)); + resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, false, command)); + + resolveAndConsume(resolver); + assertTrue(verifier.verified); + } + + @Test + public void trackMismatchingRepairedDigestsWithDifferentData() + { + ByteBuffer digest1 = ByteBufferUtil.bytes("digest1"); + ByteBuffer digest2 = ByteBufferUtil.bytes("digest2"); + EndpointsForRange replicas = makeReplicas(2); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + TestRepairedDataVerifier verifier = new TestRepairedDataVerifier(); + verifier.expectDigest(peer1, digest1, true); + verifier.expectDigest(peer2, digest2, true); + + DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier); + + resolver.preprocess(response(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") .buildUpdate()), digest1, true, command)); + resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command)); + + resolveAndConsume(resolver); + assertTrue(verifier.verified); + } + + @Test + public void noVerificationForSingletonResponse() + { + // for CL <= 1 a coordinator shouldn't request repaired data tracking but we + // can easily assert that the verification isn't attempted even if it did + EndpointsForRange replicas = makeReplicas(2); + ByteBuffer digest1 = ByteBufferUtil.bytes("digest1"); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + TestRepairedDataVerifier verifier = new TestRepairedDataVerifier(); + verifier.expectDigest(peer1, digest1, true); + + DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier); + + resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command)); + + resolveAndConsume(resolver); + assertFalse(verifier.verified); + } + + @Test + public void responsesFromOlderVersionsAreNotTracked() + { + // In a mixed version cluster, responses from a replicas running older versions won't include + // tracking info, so the digest and pending session status are defaulted. To make sure these + // default values don't result in false positives we make sure not to consider them when + // processing in DataResolver + EndpointsForRange replicas = makeReplicas(2); + TestRepairedDataVerifier verifier = new TestRepairedDataVerifier(); + ByteBuffer digest1 = ByteBufferUtil.bytes("digest1"); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + verifier.expectDigest(peer1, digest1, true); + + DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier); + + resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command)); + // peer2 is advertising an older version, so when we deserialize its response there are two things to note: + // i) the actual serialized response cannot contain any tracking info so deserialization will use defaults of + // an empty digest and pending sessions = false + // ii) under normal circumstances, this would cause a mismatch with peer1, but because of the older version, + // here it will not + resolver.preprocess(response(command, peer2, iter(PartitionUpdate.emptyUpdate(cfm,dk)), + false, MessagingService.VERSION_30, + ByteBufferUtil.EMPTY_BYTE_BUFFER, false)); + + resolveAndConsume(resolver); + assertTrue(verifier.verified); + } + + @Test + public void responsesFromTransientReplicasAreNotTracked() + { + EndpointsForRange replicas = makeReplicas(2); + EndpointsForRange.Mutable mutable = replicas.newMutable(2); + mutable.add(replicas.get(0)); + mutable.add(Replica.transientReplica(replicas.get(1).endpoint(), replicas.range())); + replicas = mutable.asSnapshot(); + + TestRepairedDataVerifier verifier = new TestRepairedDataVerifier(); + ByteBuffer digest1 = ByteBufferUtil.bytes("digest1"); + ByteBuffer digest2 = ByteBufferUtil.bytes("digest2"); + InetAddressAndPort peer1 = replicas.get(0).endpoint(); + InetAddressAndPort peer2 = replicas.get(1).endpoint(); + verifier.expectDigest(peer1, digest1, true); + + DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier); + + resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command)); + resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest2, true, command)); + + resolveAndConsume(resolver); + assertTrue(verifier.verified); + } + + private static class TestRepairedDataVerifier implements RepairedDataVerifier + { + private final RepairedDataTracker expected = new RepairedDataTracker(null); + private boolean verified = false; + + private void expectDigest(InetAddressAndPort from, ByteBuffer digest, boolean conclusive) + { + expected.recordDigest(from, digest, conclusive); + } + + @Override + public void verify(RepairedDataTracker tracker) + { + verified = expected.equals(tracker); + } + } + + private DataResolver resolverWithVerifier(final ReadCommand command, + final ReplicaLayout.ForRange plan, + final ReadRepair readRepair, + final long queryStartNanoTime, + final RepairedDataVerifier verifier) + { + class TestableDataResolver extends DataResolver + { + + public TestableDataResolver(ReadCommand command, ReplicaLayout.ForRange plan, ReadRepair readRepair, long queryStartNanoTime) + { + super(command, plan, readRepair, queryStartNanoTime); + } + + protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command) + { + return verifier; + } + } + + return new TestableDataResolver(command, plan, readRepair, queryStartNanoTime); + } + private void assertRepairContainsDeletions(Mutation mutation, DeletionTime deletionTime, RangeTombstone...rangeTombstones) @@ -954,4 +1302,19 @@ public class DataResolverTest extends AbstractReadResponseTest { return new ReplicaLayout.ForRange(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas); } + + private static void resolveAndConsume(DataResolver resolver) + { + try (PartitionIterator iterator = resolver.resolve()) + { + while (iterator.hasNext()) + { + try (RowIterator partition = iterator.next()) + { + while (partition.hasNext()) + partition.next(); + } + } + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java index a574d02..a4b7615 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java @@ -93,10 +93,10 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest ReadCallback readCallback = null; @Override - void sendReadCommand(InetAddressAndPort to, ReadCallback callback) + void sendReadCommand(Replica to, ReadCallback callback) { assert readCallback == null || readCallback == callback; - readCommandRecipients.add(to); + readCommandRecipients.add(to.endpoint()); readCallback = callback; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java index c345ee6..a5efe27 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java @@ -141,7 +141,7 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest Assert.assertNotNull(e.toMap()); } - void sendReadCommand(InetAddressAndPort to, ReadCallback callback) + void sendReadCommand(Replica to, ReadCallback callback) { assert readCallback == null || readCallback == callback; readCallback = callback; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java index 9bb7eed..bee5ddd 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java @@ -30,6 +30,7 @@ import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.service.reads.ReadCallback; @@ -46,10 +47,10 @@ public class ReadOnlyReadRepairTest extends AbstractReadRepairTest ReadCallback readCallback = null; @Override - void sendReadCommand(InetAddressAndPort to, ReadCallback callback) + void sendReadCommand(Replica to, ReadCallback callback) { assert readCallback == null || readCallback == callback; - readCommandRecipients.add(to); + readCommandRecipients.add(to.endpoint()); readCallback = callback; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java new file mode 100644 index 0000000..c916d13 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.repair; + +import java.net.UnknownHostException; +import java.util.Random; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertEquals; + +public class RepairedDataVerifierTest +{ + private static final String TEST_NAME = "read_command_vh_test_"; + private static final String KEYSPACE = TEST_NAME + "cql_keyspace"; + private static final String TABLE = "table1"; + + private final Random random = new Random(); + private TableMetadata metadata; + private TableMetrics metrics; + + // counter to generate the last byte of peer addresses + private int addressSuffix = 10; + + @BeforeClass + public static void init() + { + SchemaLoader.loadSchema(); + SchemaLoader.schemaDefinition(TEST_NAME); + DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true); + } + + @Before + public void setup() + { + metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE); + metrics = ColumnFamilyStore.metricsFor(metadata.id); + } + + @Test + public void repairedDataMismatchWithSomeConclusive() + { + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false); + tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true); + + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount + 1 , unconfirmedCount()); + } + + @Test + public void repairedDataMismatchWithNoneConclusive() + { + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false); + tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), false); + + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount + 1 , unconfirmedCount()); + } + + @Test + public void repairedDataMismatchWithAllConclusive() + { + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true); + tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true); + + tracker.verify(); + assertEquals(confirmedCount + 1, confirmedCount()); + assertEquals(unconfirmedCount, unconfirmedCount()); + } + + @Test + public void repairedDataMatchesWithAllConclusive() + { + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true); + tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), true); + + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount, unconfirmedCount()); + } + + @Test + public void repairedDataMatchesWithSomeConclusive() + { + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true); + tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false); + + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount, unconfirmedCount()); + } + + @Test + public void repairedDataMatchesWithNoneConclusive() + { + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false); + tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false); + + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount, unconfirmedCount()); + } + + @Test + public void allEmptyDigestWithAllConclusive() + { + // if a read didn't touch any repaired sstables, digests will be empty + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount, unconfirmedCount()); + } + + @Test + public void allEmptyDigestsWithSomeConclusive() + { + // if a read didn't touch any repaired sstables, digests will be empty + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false); + + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount, unconfirmedCount()); + } + + @Test + public void allEmptyDigestsWithNoneConclusive() + { + // if a read didn't touch any repaired sstables, digests will be empty + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, false); + tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false); + + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount, unconfirmedCount()); + } + + @Test + public void noTrackingDataRecorded() + { + // if a read didn't land on any replicas which support repaired data tracking, nothing will be recorded + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount, unconfirmedCount()); + } + + private long confirmedCount() + { + return metrics.confirmedRepairedInconsistencies.table.getCount(); + } + + private long unconfirmedCount() + { + return metrics.unconfirmedRepairedInconsistencies.table.getCount(); + } + + private InetAddressAndPort peer() + { + try + { + return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) addressSuffix++ }); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + private int key() + { + return random.nextInt(); + } + + private ReadCommand command(int key) + { + return new StubReadCommand(key, metadata, false); + } + + private static class StubReadCommand extends SinglePartitionReadCommand + { + StubReadCommand(int key, TableMetadata metadata, boolean isDigest) + { + super(isDigest, + 0, + false, + metadata, + FBUtilities.nowInSeconds(), + ColumnFilter.all(metadata), + RowFilter.NONE, + DataLimits.NONE, + metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)), + null, + null); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org