Repository: cassandra
Updated Branches:
  refs/heads/trunk 744973e4c -> 5fbb938ad


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java 
b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
new file mode 100644
index 0000000..816fe9f
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+public interface RepairedDataVerifier
+{
+    public void verify(RepairedDataTracker tracker);
+
+    static RepairedDataVerifier simple(ReadCommand command)
+    {
+        return new SimpleVerifier(command);
+    }
+
+    static class SimpleVerifier implements RepairedDataVerifier
+    {
+        private static final Logger logger = 
LoggerFactory.getLogger(SimpleVerifier.class);
+        private final ReadCommand command;
+
+        private static final String INCONSISTENCY_WARNING = "Detected mismatch 
between repaired datasets for table {}.{} during read of {}. {}";
+
+        SimpleVerifier(ReadCommand command)
+        {
+            this.command = command;
+        }
+
+        @Override
+        public void verify(RepairedDataTracker tracker)
+        {
+            Tracing.trace("Verifying repaired data tracker {}", tracker);
+
+            // some mismatch occurred between the repaired datasets on the 
replicas
+            if (tracker.digests.keySet().size() > 1)
+            {
+                // if any of the digests should be considered inconclusive, 
because there were
+                // pending repair sessions which had not yet been committed or 
unrepaired partition
+                // deletes which meant some sstables were skipped during 
reads, mark the inconsistency
+                // as confirmed
+                if (tracker.inconclusiveDigests.isEmpty())
+                {
+                    TableMetrics metrics = 
ColumnFamilyStore.metricsFor(command.metadata().id);
+                    metrics.confirmedRepairedInconsistencies.mark();
+                    NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, 
TimeUnit.MINUTES,
+                                     INCONSISTENCY_WARNING, 
command.metadata().keyspace,
+                                     command.metadata().name, 
getCommandString(), tracker);
+                }
+                else if 
(DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches())
+                {
+                    TableMetrics metrics = 
ColumnFamilyStore.metricsFor(command.metadata().id);
+                    metrics.unconfirmedRepairedInconsistencies.mark();
+                    NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, 
TimeUnit.MINUTES,
+                                     INCONSISTENCY_WARNING, 
command.metadata().keyspace,
+                                     command.metadata().name, 
getCommandString(), tracker);
+                }
+            }
+        }
+
+        private String getCommandString()
+        {
+            return command instanceof SinglePartitionReadCommand
+                   ? 
((SinglePartitionReadCommand)command).partitionKey().toString()
+                   : 
((PartitionRangeReadCommand)command).dataRange().keyRange().getString(command.metadata().partitionKeyType);
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java 
b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 5020b95..8df7651 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -20,10 +20,12 @@ package org.apache.cassandra.db;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -37,28 +39,36 @@ import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.partitions.FilteredPartition;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
+import org.apache.cassandra.schema.CachingParams;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class ReadCommandTest
 {
@@ -68,6 +78,20 @@ public class ReadCommandTest
     private static final String CF3 = "Standard3";
     private static final String CF4 = "Standard4";
     private static final String CF5 = "Standard5";
+    private static final String CF6 = "Standard6";
+
+    private static final InetAddressAndPort REPAIR_COORDINATOR;
+    static {
+        try
+        {
+            REPAIR_COORDINATOR = InetAddressAndPort.getByName("10.0.0.1");
+        }
+        catch (UnknownHostException e)
+        {
+
+            throw new AssertionError(e);
+        }
+    }
 
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
@@ -116,6 +140,14 @@ public class ReadCommandTest
                      .addRegularColumn("e", AsciiType.instance)
                      .addRegularColumn("f", AsciiType.instance);
 
+        TableMetadata.Builder metadata6 =
+        TableMetadata.builder(KEYSPACE, CF6)
+                     .addPartitionKeyColumn("key", BytesType.instance)
+                     .addClusteringColumn("col", AsciiType.instance)
+                     .addRegularColumn("a", AsciiType.instance)
+                     .addRegularColumn("b", AsciiType.instance)
+                     .caching(CachingParams.CACHE_EVERYTHING);
+
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE,
                                     KeyspaceParams.simple(1),
@@ -123,7 +155,10 @@ public class ReadCommandTest
                                     metadata2,
                                     metadata3,
                                     metadata4,
-                                    metadata5);
+                                    metadata5,
+                                    metadata6);
+
+        LocalSessionAccessor.startup();
     }
 
     @Test
@@ -540,4 +575,242 @@ public class ReadCommandTest
         assertEquals(1, 
cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax());
     }
 
+    @Test
+    public void testSinglePartitionSliceRepairedDataTracking() throws Exception
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+        ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
+        testRepairedDataTracking(cfs, readCommand);
+    }
+
+    @Test
+    public void testPartitionRangeRepairedDataTracking() throws Exception
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+        ReadCommand readCommand = Util.cmd(cfs).build();
+        testRepairedDataTracking(cfs, readCommand);
+    }
+
+    @Test
+    public void testSinglePartitionNamesRepairedDataTracking() throws Exception
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+        ReadCommand readCommand = Util.cmd(cfs, 
Util.dk("key")).includeRow("cc").includeRow("dd").build();
+        testRepairedDataTracking(cfs, readCommand);
+    }
+
+    @Test
+    public void 
testSinglePartitionNamesSkipsOptimisationsIfTrackingRepairedData()
+    {
+        // when tracking, the optimizations of querying sstables in timestamp 
order and
+        // returning once all requested columns are not available so just 
assert that
+        // all sstables are read when performing such queries
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
+            .clustering("dd")
+            .add("a", ByteBufferUtil.bytes("abcd"))
+            .build()
+            .apply();
+
+        cfs.forceBlockingFlush();
+
+        new RowUpdateBuilder(cfs.metadata(), 1, ByteBufferUtil.bytes("key"))
+            .clustering("dd")
+            .add("a", ByteBufferUtil.bytes("wxyz"))
+            .build()
+            .apply();
+
+        cfs.forceBlockingFlush();
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+        assertEquals(2, sstables.size());
+        Collections.sort(sstables, SSTableReader.maxTimestampComparator);
+
+        ReadCommand readCommand = Util.cmd(cfs, 
Util.dk("key")).includeRow("dd").columns("a").build();
+
+        assertEquals(0, readCount(sstables.get(0)));
+        assertEquals(0, readCount(sstables.get(1)));
+        ReadCommand withTracking = readCommand.copy();
+        withTracking.trackRepairedStatus();
+        Util.getAll(withTracking);
+        assertEquals(1, readCount(sstables.get(0)));
+        assertEquals(1, readCount(sstables.get(1)));
+
+        // same command without tracking touches only the table with the 
higher timestamp
+        Util.getAll(readCommand.copy());
+        assertEquals(2, readCount(sstables.get(0)));
+        assertEquals(1, readCount(sstables.get(1)));
+    }
+
+    private long readCount(SSTableReader sstable)
+    {
+        return sstable.getReadMeter().count();
+    }
+
+    @Test
+    public void skipRowCacheIfTrackingRepairedData()
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
+                .clustering("cc")
+                .add("a", ByteBufferUtil.bytes("abcd"))
+                .build()
+                .apply();
+
+        cfs.forceBlockingFlush();
+
+        ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
+        assertTrue(cfs.isRowCacheEnabled());
+        // warm the cache
+        assertFalse(Util.getAll(readCommand).isEmpty());
+        long cacheHits = cfs.metric.rowCacheHit.getCount();
+
+        Util.getAll(readCommand);
+        assertTrue(cfs.metric.rowCacheHit.getCount() > cacheHits);
+        cacheHits = cfs.metric.rowCacheHit.getCount();
+
+        ReadCommand withRepairedInfo = readCommand.copy();
+        withRepairedInfo.trackRepairedStatus();
+        Util.getAll(withRepairedInfo);
+        assertEquals(cacheHits, cfs.metric.rowCacheHit.getCount());
+    }
+
+    private void testRepairedDataTracking(ColumnFamilyStore cfs, ReadCommand 
readCommand) throws IOException
+    {
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
+                .clustering("cc")
+                .add("a", ByteBufferUtil.bytes("abcd"))
+                .build()
+                .apply();
+
+        cfs.forceBlockingFlush();
+
+        new RowUpdateBuilder(cfs.metadata(), 1, ByteBufferUtil.bytes("key"))
+                .clustering("dd")
+                .add("a", ByteBufferUtil.bytes("abcd"))
+                .build()
+                .apply();
+
+        cfs.forceBlockingFlush();
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+        assertEquals(2, sstables.size());
+        sstables.forEach(sstable -> assertFalse(sstable.isRepaired() || 
sstable.isPendingRepair()));
+        SSTableReader sstable1 = sstables.get(0);
+        SSTableReader sstable2 = sstables.get(1);
+
+        int numPartitions = 1;
+        int rowsPerPartition = 2;
+
+        // Capture all the digest versions as we mutate the table's repaired 
status. Each time
+        // we make a change, we expect a different digest.
+        Set<ByteBuffer> digests = new HashSet<>();
+        // first time round, nothing has been marked repaired so we expect 
digest to be an empty buffer and to be marked conclusive
+        ByteBuffer digest = performReadAndVerifyRepairedInfo(readCommand, 
numPartitions, rowsPerPartition, true);
+        assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+        digests.add(digest);
+
+        // add a pending repair session to table1, digest should remain the 
same but now we expect it to be marked inconclusive
+        UUID session1 = UUIDGen.getTimeUUID();
+        mutateRepaired(cfs, sstable1, ActiveRepairService.UNREPAIRED_SSTABLE, 
session1);
+        digests.add(performReadAndVerifyRepairedInfo(readCommand, 
numPartitions, rowsPerPartition, false));
+        assertEquals(1, digests.size());
+
+        // add a different pending session to table2, digest should remain the 
same and still consider it inconclusive
+        UUID session2 = UUIDGen.getTimeUUID();
+        mutateRepaired(cfs, sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, 
session2);
+        digests.add(performReadAndVerifyRepairedInfo(readCommand, 
numPartitions, rowsPerPartition, false));
+        assertEquals(1, digests.size());
+
+        // mark one table repaired
+        mutateRepaired(cfs, sstable1, 111, null);
+        // this time, digest should not be empty, session2 still means that 
the result is inconclusive
+        digests.add(performReadAndVerifyRepairedInfo(readCommand, 
numPartitions, rowsPerPartition, false));
+        assertEquals(2, digests.size());
+
+        // mark the second table repaired
+        mutateRepaired(cfs, sstable2, 222, null);
+        // digest should be updated again and as there are no longer any 
pending sessions, it should be considered conclusive
+        digests.add(performReadAndVerifyRepairedInfo(readCommand, 
numPartitions, rowsPerPartition, true));
+        assertEquals(3, digests.size());
+
+        // insert a partition tombstone into the memtable, then re-check the 
repaired info.
+        // This is to ensure that when the optimisations which skip reading 
from sstables
+        // when a newer partition tombstone has already been cause the digest 
to be marked
+        // as inconclusive.
+        // the exception to this case is for partition range reads, where we 
always read
+        // and generate digests for all sstables, so we only test this path 
for single partition reads
+        if (readCommand.isLimitedToOnePartition())
+        {
+            new Mutation(PartitionUpdate.simpleBuilder(cfs.metadata(), 
ByteBufferUtil.bytes("key"))
+                                        .delete()
+                                        .build()).apply();
+            digest = performReadAndVerifyRepairedInfo(readCommand, 0, 
rowsPerPartition, false);
+            assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+
+            // now flush so we have an unrepaired table with the deletion and 
repeat the check
+            cfs.forceBlockingFlush();
+            digest = performReadAndVerifyRepairedInfo(readCommand, 0, 
rowsPerPartition, false);
+            assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+        }
+    }
+
+    private void mutateRepaired(ColumnFamilyStore cfs, SSTableReader sstable, 
long repairedAt, UUID pendingSession) throws IOException
+    {
+        
sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor,
 repairedAt, pendingSession, false);
+        sstable.reloadSSTableMetadata();
+        if (pendingSession != null)
+        {
+            // setup a minimal repair session. This is necessary because we
+            // check for sessions which have exceeded timeout and been purged
+            Range<Token> range = new 
Range<>(cfs.metadata().partitioner.getMinimumToken(),
+                                             
cfs.metadata().partitioner.getRandomToken());
+            
ActiveRepairService.instance.registerParentRepairSession(pendingSession,
+                                                                     
REPAIR_COORDINATOR,
+                                                                     
Lists.newArrayList(cfs),
+                                                                     
Sets.newHashSet(range),
+                                                                     true,
+                                                                     
repairedAt,
+                                                                     true,
+                                                                     
PreviewKind.NONE);
+
+            LocalSessionAccessor.prepareUnsafe(pendingSession, null, 
Sets.newHashSet(REPAIR_COORDINATOR));
+        }
+    }
+
+    private ByteBuffer performReadAndVerifyRepairedInfo(ReadCommand command,
+                                                        int expectedPartitions,
+                                                        int 
expectedRowsPerPartition,
+                                                        boolean 
expectConclusive)
+    {
+        // perform equivalent read command multiple times and assert that
+        // the repaired data info is always consistent. Return the digest
+        // so we can verify that it changes when the repaired status of
+        // the queried tables does.
+        Set<ByteBuffer> digests = new HashSet<>();
+        for (int i = 0; i < 10; i++)
+        {
+            ReadCommand withRepairedInfo = command.copy();
+            withRepairedInfo.trackRepairedStatus();
+
+            List<FilteredPartition> partitions = Util.getAll(withRepairedInfo);
+            assertEquals(expectedPartitions, partitions.size());
+            partitions.forEach(p -> assertEquals(expectedRowsPerPartition, 
p.rowCount()));
+
+            ByteBuffer digest = withRepairedInfo.getRepairedDataDigest();
+            digests.add(digest);
+            assertEquals(1, digests.size());
+            assertEquals(expectConclusive, 
withRepairedInfo.isRepairedDataDigestConclusive());
+        }
+        return digests.iterator().next();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java 
b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
new file mode 100644
index 0000000..0c43661
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.IMessageSink;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ReadCommandVerbHandlerTest
+{
+    private static final String TEST_NAME = "read_command_vh_test_";
+    private static final String KEYSPACE = TEST_NAME + "cql_keyspace";
+    private static final String TABLE = "table1";
+
+    private final Random random = new Random();
+    private ReadCommandVerbHandler handler;
+    private TableMetadata metadata;
+
+    @BeforeClass
+    public static void init()
+    {
+        SchemaLoader.loadSchema();
+        SchemaLoader.schemaDefinition(TEST_NAME);
+    }
+
+    @Before
+    public void setup()
+    {
+        MessagingService.instance().clearMessageSinks();
+        MessagingService.instance().addMessageSink(new IMessageSink()
+        {
+            public boolean allowOutgoingMessage(MessageOut message, int id, 
InetAddressAndPort to)
+            {
+                return false;
+            }
+
+            public boolean allowIncomingMessage(MessageIn message, int id)
+            {
+                return false;
+            }
+        });
+
+        metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+        handler = new ReadCommandVerbHandler();
+    }
+
+    @Test
+    public void setRepairedDataTrackingFlagIfHeaderPresent()
+    {
+        ReadCommand command = command(key());
+        assertFalse(command.isTrackingRepairedStatus());
+        Map<ParameterType, Object> params = 
ImmutableMap.of(ParameterType.TRACK_REPAIRED_DATA,
+                                                            
MessagingService.ONE_BYTE);
+        handler.doVerb(MessageIn.create(peer(),
+                                        command,
+                                        params,
+                                        MessagingService.Verb.READ,
+                                        MessagingService.current_version),
+                       messageId());
+        assertTrue(command.isTrackingRepairedStatus());
+    }
+
+    @Test
+    public void dontSetRepairedDataTrackingFlagUnlessHeaderPresent()
+    {
+        ReadCommand command = command(key());
+        assertFalse(command.isTrackingRepairedStatus());
+        Map<ParameterType, Object> params = 
ImmutableMap.of(ParameterType.TRACE_SESSION,
+                                                            UUID.randomUUID());
+        handler.doVerb(MessageIn.create(peer(),
+                                        command,
+                                        params,
+                                        MessagingService.Verb.READ,
+                                        MessagingService.current_version),
+                       messageId());
+        assertFalse(command.isTrackingRepairedStatus());
+    }
+
+    @Test
+    public void dontSetRepairedDataTrackingFlagIfHeadersEmpty()
+    {
+        ReadCommand command = command(key());
+        assertFalse(command.isTrackingRepairedStatus());
+        handler.doVerb(MessageIn.create(peer(),
+                                        command,
+                                        ImmutableMap.of(),
+                                        MessagingService.Verb.READ,
+                                        MessagingService.current_version),
+                       messageId());
+        assertFalse(command.isTrackingRepairedStatus());
+    }
+
+    private int key()
+    {
+        return random.nextInt();
+    }
+
+    private int messageId()
+    {
+        return random.nextInt();
+    }
+
+    private InetAddressAndPort peer()
+    {
+        try
+        {
+            return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, 9});
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private ReadCommand command(int key)
+    {
+        return new SinglePartitionReadCommand(false,
+              0,
+              false,
+              metadata,
+              FBUtilities.nowInSeconds(),
+              ColumnFilter.all(metadata),
+              RowFilter.NONE,
+              DataLimits.NONE,
+              metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
+              new ClusteringIndexSliceFilter(Slices.ALL, false),
+              null);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/db/ReadResponseTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadResponseTest.java 
b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
new file mode 100644
index 0000000..6e1a804
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ReadResponseTest
+{
+
+    private final Random random = new Random();
+    private TableMetadata metadata;
+
+    @Before
+    public void setup()
+    {
+        metadata = TableMetadata.builder("ks", "t1")
+                                .addPartitionKeyColumn("p", Int32Type.instance)
+                                .addRegularColumn("v", Int32Type.instance)
+                                .partitioner(Murmur3Partitioner.instance)
+                                .build();
+    }
+
+    @Test
+    public void fromCommandWithConclusiveRepairedDigest()
+    {
+        ByteBuffer digest = digest();
+        ReadCommand command = command(key(), metadata, digest, true);
+        ReadResponse response = 
command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        assertTrue(response.isRepairedDigestConclusive());
+        assertEquals(digest, response.repairedDataDigest());
+        verifySerDe(response);
+    }
+
+    @Test
+    public void fromCommandWithInconclusiveRepairedDigest()
+    {
+        ByteBuffer digest = digest();
+        ReadCommand command = command(key(), metadata, digest, false);
+        ReadResponse response = 
command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        assertFalse(response.isRepairedDigestConclusive());
+        assertEquals(digest, response.repairedDataDigest());
+        verifySerDe(response);
+    }
+
+    @Test
+    public void fromCommandWithConclusiveEmptyRepairedDigest()
+    {
+        ReadCommand command = command(key(), metadata, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+        ReadResponse response = 
command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        assertTrue(response.isRepairedDigestConclusive());
+        assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, 
response.repairedDataDigest());
+        verifySerDe(response);
+    }
+
+    @Test
+    public void fromCommandWithInconclusiveEmptyRepairedDigest()
+    {
+        ReadCommand command = command(key(), metadata, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
+        ReadResponse response = 
command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        assertFalse(response.isRepairedDigestConclusive());
+        assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, 
response.repairedDataDigest());
+        verifySerDe(response);
+    }
+
+    /*
+     * Digest responses should never include repaired data tracking as we only 
request
+     * it in read repair or for range queries
+     */
+    @Test (expected = UnsupportedOperationException.class)
+    public void digestResponseErrorsIfRepairedDataDigestRequested()
+    {
+        ReadCommand command = digestCommand(key(), metadata);
+        ReadResponse response = 
command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        assertTrue(response.isDigestResponse());
+        assertFalse(response.mayIncludeRepairedDigest());
+        response.repairedDataDigest();
+    }
+
+    @Test (expected = UnsupportedOperationException.class)
+    public void digestResponseErrorsIfIsConclusiveRequested()
+    {
+        ReadCommand command = digestCommand(key(), metadata);
+        ReadResponse response = 
command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        assertTrue(response.isDigestResponse());
+        assertFalse(response.mayIncludeRepairedDigest());
+        response.isRepairedDigestConclusive();
+    }
+
+    @Test (expected = UnsupportedOperationException.class)
+    public void digestResponseErrorsIfIteratorRequested()
+    {
+        ReadCommand command = digestCommand(key(), metadata);
+        ReadResponse response = 
command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        assertTrue(response.isDigestResponse());
+        assertFalse(response.mayIncludeRepairedDigest());
+        response.makeIterator(command);
+    }
+
+    @Test
+    public void makeDigestDoesntConsiderRepairedDataInfo()
+    {
+        // It shouldn't be possible to get false positive 
DigestMismatchExceptions based
+        // on differing repaired data tracking info because it isn't requested 
on initial
+        // requests, only following a digest mismatch. Having a test doesn't 
hurt though
+        int key = key();
+        ByteBuffer digest1 = digest();
+        ReadCommand command1 = command(key, metadata, digest1, true);
+        ReadResponse response1 = 
command1.createResponse(EmptyIterators.unfilteredPartition(metadata));
+
+        ByteBuffer digest2 = digest();
+        ReadCommand command2 = command(key, metadata, digest2, false);
+        ReadResponse response2 = 
command1.createResponse(EmptyIterators.unfilteredPartition(metadata));
+
+        assertEquals(response1.digest(command1), response2.digest(command2));
+    }
+
+    private void verifySerDe(ReadResponse response) {
+        // check that roundtripping through ReadResponse.serializer behaves as 
expected.
+        // ReadResponses from pre-4.0 nodes will never contain repaired data 
digest
+        // or pending session info, but we run all messages through both 
pre/post 4.0
+        // serde to check that the defaults are correctly applied
+        roundTripSerialization(response, MessagingService.current_version);
+        roundTripSerialization(response, MessagingService.VERSION_30);
+
+    }
+
+    private void roundTripSerialization(ReadResponse response, int version)
+    {
+        try
+        {
+            DataOutputBuffer out = new DataOutputBuffer();
+            ReadResponse.serializer.serialize(response, out, version);
+
+            DataInputBuffer in = new DataInputBuffer(out.buffer(), false);
+            ReadResponse deser = ReadResponse.serializer.deserialize(in, 
version);
+            if (version < MessagingService.VERSION_40)
+            {
+                assertFalse(deser.mayIncludeRepairedDigest());
+                // even though that means they should never be used, verify 
that the default values are present
+                assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, 
deser.repairedDataDigest());
+                assertTrue(deser.isRepairedDigestConclusive());
+            }
+            else
+            {
+                assertTrue(deser.mayIncludeRepairedDigest());
+                assertEquals(response.repairedDataDigest(), 
deser.repairedDataDigest());
+                assertEquals(response.isRepairedDigestConclusive(), 
deser.isRepairedDigestConclusive());
+            }
+        }
+        catch (IOException e)
+        {
+            fail("Caught unexpected IOException during SerDe: " + 
e.getMessage());
+        }
+    }
+
+
+    private int key()
+    {
+        return random.nextInt();
+    }
+
+    private ByteBuffer digest()
+    {
+        byte[] bytes = new byte[4];
+        random.nextBytes(bytes);
+        return ByteBuffer.wrap(bytes);
+    }
+
+    private ReadCommand digestCommand(int key, TableMetadata metadata)
+    {
+        return new StubReadCommand(key, metadata, true, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+    }
+
+    private ReadCommand command(int key, TableMetadata metadata, ByteBuffer 
repairedDigest, boolean conclusive)
+    {
+        return new StubReadCommand(key, metadata, false, repairedDigest, 
conclusive);
+    }
+
+    private static class StubReadCommand extends SinglePartitionReadCommand
+    {
+
+        private final ByteBuffer repairedDigest;
+        private final boolean conclusive;
+
+        StubReadCommand(int key, TableMetadata metadata,
+                        boolean isDigest,
+                        final ByteBuffer repairedDigest,
+                        final boolean conclusive)
+        {
+            super(isDigest,
+                  0,
+                  false,
+                  metadata,
+                  FBUtilities.nowInSeconds(),
+                  ColumnFilter.all(metadata),
+                  RowFilter.NONE,
+                  DataLimits.NONE,
+                  metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
+                  null,
+                  null);
+            this.repairedDigest = repairedDigest;
+            this.conclusive = conclusive;
+        }
+
+        @Override
+        public ByteBuffer getRepairedDataDigest()
+        {
+            return repairedDigest;
+        }
+
+        @Override
+        public boolean isRepairedDataDigestConclusive()
+        {
+            return conclusive;
+        }
+
+        public UnfilteredPartitionIterator 
executeLocally(ReadExecutionController controller)
+        {
+            return EmptyIterators.unfilteredPartition(this.metadata());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java 
b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
index c6f2232..582aff8 100644
--- a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.service.reads;
 
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.SortedSet;
@@ -214,15 +215,38 @@ public abstract class AbstractReadResponseTest
         return dk(Integer.toString(k));
     }
 
-    static MessageIn<ReadResponse> response(ReadCommand command, 
InetAddressAndPort from, UnfilteredPartitionIterator data, boolean digest)
+
+    static MessageIn<ReadResponse> response(ReadCommand command,
+                                            InetAddressAndPort from,
+                                            UnfilteredPartitionIterator data,
+                                            boolean isDigestResponse,
+                                            int fromVersion,
+                                            ByteBuffer repairedDataDigest,
+                                            boolean hasPendingRepair)
+    {
+        ReadResponse response = isDigestResponse
+                                ? ReadResponse.createDigestResponse(data, 
command)
+                                : ReadResponse.createRemoteDataResponse(data, 
repairedDataDigest, hasPendingRepair, command, fromVersion);
+        return MessageIn.create(from, response, Collections.emptyMap(), 
MessagingService.Verb.READ, fromVersion);
+    }
+
+    static MessageIn<ReadResponse> response(InetAddressAndPort from,
+                                            UnfilteredPartitionIterator 
partitionIterator,
+                                            ByteBuffer repairedDigest,
+                                            boolean hasPendingRepair,
+                                            ReadCommand cmd)
+    {
+        return response(cmd, from, partitionIterator, false, 
MessagingService.current_version, repairedDigest, hasPendingRepair);
+    }
+
+    static MessageIn<ReadResponse> response(ReadCommand command, 
InetAddressAndPort from, UnfilteredPartitionIterator data, boolean 
isDigestResponse)
     {
-        ReadResponse response = digest ? 
ReadResponse.createDigestResponse(data, command) : 
ReadResponse.createDataResponse(data, command);
-        return MessageIn.create(from, response, Collections.emptyMap(), 
MessagingService.Verb.READ, MessagingService.current_version);
+        return response(command, from, data, false, 
MessagingService.current_version, ByteBufferUtil.EMPTY_BYTE_BUFFER, 
isDigestResponse);
     }
 
     static MessageIn<ReadResponse> response(ReadCommand command, 
InetAddressAndPort from, UnfilteredPartitionIterator data)
     {
-        return response(command, from, data, false);
+        return response(command, from, data, false, 
MessagingService.current_version, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
     }
 
     public RangeTombstone tombstone(Object start, Object end, long 
markedForDeleteAt, int localDeletionTime)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java 
b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index abec25d..968ef16 100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -55,8 +55,13 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.ReplicaUtils;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
+import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
 import org.apache.cassandra.service.reads.repair.TestableReadRepair;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -81,6 +86,7 @@ public class DataResolverTest extends AbstractReadResponseTest
     public void setup()
     {
         command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
+        command.trackRepairedStatus();
         readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM);
     }
 
@@ -586,6 +592,7 @@ public class DataResolverTest extends 
AbstractReadResponseTest
     public void testRepairRangeTombstoneWithPartitionDeletion()
     {
         EndpointsForRange replicas = makeReplicas(2);
+
         DataResolver resolver = new DataResolver(command, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime());
         InetAddressAndPort peer1 = replicas.get(0).endpoint();
         InetAddressAndPort peer2 = replicas.get(1).endpoint();
@@ -898,6 +905,347 @@ public class DataResolverTest extends 
AbstractReadResponseTest
         Assert.assertNull(readRepair.sent.get(peer2));
     }
 
+    /** Tests for repaired data tracking */
+
+    @Test
+    public void trackMatchingEmptyDigestsWithAllConclusive()
+    {
+        EndpointsForRange replicas = makeReplicas(2);
+        ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, true);
+        verifier.expectDigest(peer2, digest1, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+        resolver.preprocess(response(peer2, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMatchingEmptyDigestsWithSomeConclusive()
+    {
+        ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        EndpointsForRange replicas = makeReplicas(2);
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, false);
+        verifier.expectDigest(peer2, digest1, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+        resolver.preprocess(response(peer2, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMatchingEmptyDigestsWithNoneConclusive()
+    {
+        ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        EndpointsForRange replicas = makeReplicas(2);
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, false);
+        verifier.expectDigest(peer2, digest1, false);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+        resolver.preprocess(response(peer1, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+        resolver.preprocess(response(peer2, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMatchingDigestsWithAllConclusive()
+    {
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        EndpointsForRange replicas = makeReplicas(2);
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, true);
+        verifier.expectDigest(peer2, digest1, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+        resolver.preprocess(response(peer1, 
iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
+        resolver.preprocess(response(peer2, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMatchingDigestsWithSomeConclusive()
+    {
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        EndpointsForRange replicas = makeReplicas(2);
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, true);
+        verifier.expectDigest(peer2, digest1, false);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+        resolver.preprocess(response(peer1, 
iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
+        resolver.preprocess(response(peer2, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMatchingDigestsWithNoneConclusive()
+    {
+        EndpointsForRange replicas = makeReplicas(2);
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, false);
+        verifier.expectDigest(peer2, digest1, false);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+        resolver.preprocess(response(peer2, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMatchingRepairedDigestsWithDifferentData()
+    {
+        // As far as repaired data tracking is concerned, the actual data in 
the response is not relevant
+        EndpointsForRange replicas = makeReplicas(2);
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, true);
+        verifier.expectDigest(peer2, digest1, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, iter(new RowUpdateBuilder(cfm, 
nowInSec, 1L, dk).clustering("1") .buildUpdate()), digest1, true, command));
+        resolver.preprocess(response(peer2, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMismatchingRepairedDigestsWithAllConclusive()
+    {
+        EndpointsForRange replicas = makeReplicas(2);
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, true);
+        verifier.expectDigest(peer2, digest2, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+        resolver.preprocess(response(peer2, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMismatchingRepairedDigestsWithSomeConclusive()
+    {
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
+        EndpointsForRange replicas = makeReplicas(2);
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, false);
+        verifier.expectDigest(peer2, digest2, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+        resolver.preprocess(response(peer2, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMismatchingRepairedDigestsWithNoneConclusive()
+    {
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
+        EndpointsForRange replicas = makeReplicas(2);
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, false);
+        verifier.expectDigest(peer2, digest2, false);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+        resolver.preprocess(response(peer2, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, false, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMismatchingRepairedDigestsWithDifferentData()
+    {
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
+        EndpointsForRange replicas = makeReplicas(2);
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, true);
+        verifier.expectDigest(peer2, digest2, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, iter(new RowUpdateBuilder(cfm, 
nowInSec, 1L, dk).clustering("1") .buildUpdate()), digest1, true, command));
+        resolver.preprocess(response(peer2, 
iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void noVerificationForSingletonResponse()
+    {
+        // for CL <= 1 a coordinator shouldn't request repaired data tracking 
but we
+        // can easily assert that the verification isn't attempted even if it 
did
+        EndpointsForRange replicas = makeReplicas(2);
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, 
iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
+
+        resolveAndConsume(resolver);
+        assertFalse(verifier.verified);
+    }
+
+    @Test
+    public void responsesFromOlderVersionsAreNotTracked()
+    {
+        // In a mixed version cluster, responses from a replicas running older 
versions won't include
+        // tracking info, so the digest and pending session status are 
defaulted. To make sure these
+        // default values don't result in false positives we make sure not to 
consider them when
+        // processing in DataResolver
+        EndpointsForRange replicas = makeReplicas(2);
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        verifier.expectDigest(peer1, digest1, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, 
iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
+        // peer2 is advertising an older version, so when we deserialize its 
response there are two things to note:
+        // i) the actual serialized response cannot contain any tracking info 
so deserialization will use defaults of
+        //    an empty digest and pending sessions = false
+        // ii) under normal circumstances, this would cause a mismatch with 
peer1, but because of the older version,
+        //     here it will not
+        resolver.preprocess(response(command, peer2, 
iter(PartitionUpdate.emptyUpdate(cfm,dk)),
+                                     false, MessagingService.VERSION_30,
+                                     ByteBufferUtil.EMPTY_BYTE_BUFFER, false));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void responsesFromTransientReplicasAreNotTracked()
+    {
+        EndpointsForRange replicas = makeReplicas(2);
+        EndpointsForRange.Mutable mutable = replicas.newMutable(2);
+        mutable.add(replicas.get(0));
+        mutable.add(Replica.transientReplica(replicas.get(1).endpoint(), 
replicas.range()));
+        replicas = mutable.asSnapshot();
+
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        verifier.expectDigest(peer1, digest1, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, 
iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
+        resolver.preprocess(response(peer2, 
iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest2, true, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    private static class TestRepairedDataVerifier implements 
RepairedDataVerifier
+    {
+        private final RepairedDataTracker expected = new 
RepairedDataTracker(null);
+        private boolean verified = false;
+
+        private void expectDigest(InetAddressAndPort from, ByteBuffer digest, 
boolean conclusive)
+        {
+            expected.recordDigest(from, digest, conclusive);
+        }
+
+        @Override
+        public void verify(RepairedDataTracker tracker)
+        {
+            verified = expected.equals(tracker);
+        }
+    }
+
+    private DataResolver resolverWithVerifier(final ReadCommand command,
+                                              final ReplicaLayout.ForRange 
plan,
+                                              final ReadRepair readRepair,
+                                              final long queryStartNanoTime,
+                                              final RepairedDataVerifier 
verifier)
+    {
+        class TestableDataResolver extends DataResolver
+        {
+
+            public TestableDataResolver(ReadCommand command, 
ReplicaLayout.ForRange plan, ReadRepair readRepair, long queryStartNanoTime)
+            {
+                super(command, plan, readRepair, queryStartNanoTime);
+            }
+
+            protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand 
command)
+            {
+                return verifier;
+            }
+        }
+
+        return new TestableDataResolver(command, plan, readRepair, 
queryStartNanoTime);
+    }
+
     private void assertRepairContainsDeletions(Mutation mutation,
                                                DeletionTime deletionTime,
                                                
RangeTombstone...rangeTombstones)
@@ -954,4 +1302,19 @@ public class DataResolverTest extends 
AbstractReadResponseTest
     {
         return new ReplicaLayout.ForRange(ks, consistencyLevel, 
ReplicaUtils.FULL_BOUNDS, replicas, replicas);
     }
+
+    private static void resolveAndConsume(DataResolver resolver)
+    {
+        try (PartitionIterator iterator = resolver.resolve())
+        {
+            while (iterator.hasNext())
+            {
+                try (RowIterator partition = iterator.next())
+                {
+                    while (partition.hasNext())
+                        partition.next();
+                }
+            }
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index a574d02..a4b7615 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -93,10 +93,10 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
         ReadCallback readCallback = null;
 
         @Override
-        void sendReadCommand(InetAddressAndPort to, ReadCallback callback)
+        void sendReadCommand(Replica to, ReadCallback callback)
         {
             assert readCallback == null || readCallback == callback;
-            readCommandRecipients.add(to);
+            readCommandRecipients.add(to.endpoint());
             readCallback = callback;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
index c345ee6..a5efe27 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -141,7 +141,7 @@ public class DiagEventsBlockingReadRepairTest extends 
AbstractReadRepairTest
             Assert.assertNotNull(e.toMap());
         }
 
-        void sendReadCommand(InetAddressAndPort to, ReadCallback callback)
+        void sendReadCommand(Replica to, ReadCallback callback)
         {
             assert readCallback == null || readCallback == callback;
             readCallback = callback;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
index 9bb7eed..bee5ddd 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.service.reads.ReadCallback;
 
@@ -46,10 +47,10 @@ public class ReadOnlyReadRepairTest extends 
AbstractReadRepairTest
         ReadCallback readCallback = null;
 
         @Override
-        void sendReadCommand(InetAddressAndPort to, ReadCallback callback)
+        void sendReadCommand(Replica to, ReadCallback callback)
         {
             assert readCallback == null || readCallback == callback;
-            readCommandRecipients.add(to);
+            readCommandRecipients.add(to.endpoint());
             readCallback = callback;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
new file mode 100644
index 0000000..c916d13
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.net.UnknownHostException;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+
+public class RepairedDataVerifierTest
+{
+    private static final String TEST_NAME = "read_command_vh_test_";
+    private static final String KEYSPACE = TEST_NAME + "cql_keyspace";
+    private static final String TABLE = "table1";
+
+    private final Random random = new Random();
+    private TableMetadata metadata;
+    private TableMetrics metrics;
+
+    // counter to generate the last byte of peer addresses
+    private int addressSuffix = 10;
+
+    @BeforeClass
+    public static void init()
+    {
+        SchemaLoader.loadSchema();
+        SchemaLoader.schemaDefinition(TEST_NAME);
+        DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true);
+    }
+
+    @Before
+    public void setup()
+    {
+        metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+        metrics = ColumnFamilyStore.metricsFor(metadata.id);
+    }
+
+    @Test
+    public void repairedDataMismatchWithSomeConclusive()
+    {
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new 
RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
+        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true);
+
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount + 1 , unconfirmedCount());
+    }
+
+    @Test
+    public void repairedDataMismatchWithNoneConclusive()
+    {
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new 
RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
+        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), false);
+
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount + 1 , unconfirmedCount());
+    }
+
+    @Test
+    public void repairedDataMismatchWithAllConclusive()
+    {
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new 
RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
+        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true);
+
+        tracker.verify();
+        assertEquals(confirmedCount + 1, confirmedCount());
+        assertEquals(unconfirmedCount, unconfirmedCount());
+    }
+
+    @Test
+    public void repairedDataMatchesWithAllConclusive()
+    {
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new 
RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
+        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), true);
+
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount, unconfirmedCount());
+    }
+
+    @Test
+    public void repairedDataMatchesWithSomeConclusive()
+    {
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new 
RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
+        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false);
+
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount, unconfirmedCount());
+    }
+
+    @Test
+    public void repairedDataMatchesWithNoneConclusive()
+    {
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new 
RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
+        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false);
+
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount, unconfirmedCount());
+    }
+
+    @Test
+    public void allEmptyDigestWithAllConclusive()
+    {
+        // if a read didn't touch any repaired sstables, digests will be empty
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new 
RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+        tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount, unconfirmedCount());
+    }
+
+    @Test
+    public void allEmptyDigestsWithSomeConclusive()
+    {
+        // if a read didn't touch any repaired sstables, digests will be empty
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new 
RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+        tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
+
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount, unconfirmedCount());
+    }
+
+    @Test
+    public void allEmptyDigestsWithNoneConclusive()
+    {
+        // if a read didn't touch any repaired sstables, digests will be empty
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new 
RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
+        tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
+
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount, unconfirmedCount());
+    }
+
+    @Test
+    public void noTrackingDataRecorded()
+    {
+        // if a read didn't land on any replicas which support repaired data 
tracking, nothing will be recorded
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        RepairedDataVerifier.SimpleVerifier verifier = new 
RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount, unconfirmedCount());
+    }
+
+    private long confirmedCount()
+    {
+        return metrics.confirmedRepairedInconsistencies.table.getCount();
+    }
+
+    private long unconfirmedCount()
+    {
+        return metrics.unconfirmedRepairedInconsistencies.table.getCount();
+    }
+
+    private InetAddressAndPort peer()
+    {
+        try
+        {
+            return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, 
(byte) addressSuffix++ });
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private int key()
+    {
+        return random.nextInt();
+    }
+
+    private ReadCommand command(int key)
+    {
+        return new StubReadCommand(key, metadata, false);
+    }
+
+    private static class StubReadCommand extends SinglePartitionReadCommand
+    {
+        StubReadCommand(int key, TableMetadata metadata, boolean isDigest)
+        {
+            super(isDigest,
+                  0,
+                  false,
+                  metadata,
+                  FBUtilities.nowInSeconds(),
+                  ColumnFilter.all(metadata),
+                  RowFilter.NONE,
+                  DataLimits.NONE,
+                  metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
+                  null,
+                  null);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to