Repository: cassandra
Updated Branches:
  refs/heads/trunk 994da255c -> 644676b08


Improve read repair blocking behavior

Patch by Blake Eggleston; reviewed by Marcus Eriksson and Alex Petrov
for CASSANDRA-10726


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/644676b0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/644676b0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/644676b0

Branch: refs/heads/trunk
Commit: 644676b088be5177ef1d0cdaf450306ea28d8a12
Parents: 994da25
Author: Blake Eggleston <bdeggles...@gmail.com>
Authored: Mon May 14 14:24:03 2018 -0700
Committer: Blake Eggleston <bdeggles...@gmail.com>
Committed: Tue Aug 21 11:01:10 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ConsistencyLevel.java   |  10 +-
 .../cassandra/db/PartitionRangeReadCommand.java |   1 +
 .../org/apache/cassandra/db/ReadCommand.java    |   1 +
 .../db/SinglePartitionReadCommand.java          |   1 +
 .../cassandra/metrics/ReadRepairMetrics.java    |   5 +
 .../apache/cassandra/net/MessagingService.java  |   1 +
 .../apache/cassandra/service/StorageProxy.java  |  65 +++-
 .../service/reads/AbstractReadExecutor.java     |  40 +-
 .../reads/repair/BlockingPartitionRepair.java   | 243 +++++++++++++
 .../reads/repair/BlockingReadRepair.java        | 229 ++++++------
 .../reads/repair/BlockingReadRepairs.java       | 114 ++++++
 .../service/reads/repair/NoopReadRepair.java    |  29 +-
 .../repair/PartitionIteratorMergeListener.java  |  13 +-
 .../service/reads/repair/ReadRepair.java        |  44 ++-
 .../service/reads/repair/RepairListener.java    |  34 --
 .../reads/repair/RowIteratorMergeListener.java  |  31 +-
 .../service/reads/DataResolverTest.java         |  18 +-
 .../service/reads/repair/ReadRepairTest.java    | 361 +++++++++++++++++++
 .../reads/repair/TestableReadRepair.java        |  41 ++-
 20 files changed, 1046 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9fbaf25..b34979a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Improve read repair blocking behavior (CASSANDRA-10726)
  * Add a virtual table to expose settings (CASSANDRA-14573)
  * Fix up chunk cache handling of metrics (CASSANDRA-14628)
  * Extend IAuthenticator to accept peer SSL certificates (CASSANDRA-14652)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java 
b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 8f3a51c..d37da0a 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -138,12 +138,20 @@ public enum ConsistencyLevel
         }
     }
 
+    /**
+     * Determine if this consistency level meets or exceeds the consistency 
requirements of the given cl for the given keyspace
+     */
+    public boolean satisfies(ConsistencyLevel other, Keyspace keyspace)
+    {
+        return blockFor(keyspace) >= other.blockFor(keyspace);
+    }
+
     public boolean isDatacenterLocal()
     {
         return isDCLocal;
     }
 
-    public boolean isLocal(InetAddressAndPort endpoint)
+    public static boolean isLocal(InetAddressAndPort endpoint)
     {
         return 
DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java 
b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index a6641d4..c312acc 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -24,6 +24,7 @@ import java.util.List;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.*;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index 82ca054..0262140 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.transform.RTBoundCloser;
 import org.apache.cassandra.db.transform.RTBoundValidator;
 import org.apache.cassandra.db.transform.StoppingTransformation;
 import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.UnknownIndexException;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.IndexNotAvailableException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index c091bf1..1fdb11f 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.lifecycle.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableReadsListener;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java 
b/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
index c79fe89..e639be9 100644
--- a/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
@@ -34,4 +34,9 @@ public class ReadRepairMetrics
     public static final Meter repairedBackground = 
Metrics.meter(factory.createMetricName("RepairedBackground"));
     @Deprecated
     public static final Meter attempted = 
Metrics.meter(factory.createMetricName("Attempted"));
+
+    public static final Meter speculatedRead = 
Metrics.meter(factory.createMetricName("SpeculatedRead"));
+    public static final Meter speculatedWrite = 
Metrics.meter(factory.createMetricName("SpeculatedWrite"));
+
+    public static void init() {}
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index b2e72f4..7732673 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -125,6 +125,7 @@ public final class MessagingService implements 
MessagingServiceMBean
     public static final int VERSION_30 = 10;
     public static final int VERSION_3014 = 11;
     public static final int VERSION_40 = 12;
+    public static final int minimum_version = VERSION_30;
     public static final int current_version = VERSION_40;
 
     public static final byte[] ONE_BYTE = new byte[1];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 81e6dae..7fdf591 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -183,6 +183,8 @@ public class StorageProxy implements StorageProxyMBean
             readMetricsMap.put(level, new ClientRequestMetrics("Read-" + 
level.name()));
             writeMetricsMap.put(level, new ClientWriteRequestMetrics("Write-" 
+ level.name()));
         }
+
+        ReadRepairMetrics.init();
     }
 
     /**
@@ -1766,6 +1768,34 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
+    private static PartitionIterator 
concatAndBlockOnRepair(List<PartitionIterator> iterators, List<ReadRepair> 
repairs)
+    {
+        PartitionIterator concatenated = PartitionIterators.concat(iterators);
+
+        if (repairs.isEmpty())
+            return concatenated;
+
+        return new PartitionIterator()
+        {
+            public void close()
+            {
+                concatenated.close();
+                repairs.forEach(ReadRepair::maybeSendAdditionalWrites);
+                repairs.forEach(ReadRepair::awaitWrites);
+            }
+
+            public boolean hasNext()
+            {
+                return concatenated.hasNext();
+            }
+
+            public RowIterator next()
+            {
+                return concatenated.next();
+            }
+        };
+    }
+
     /**
      * This function executes local and remote reads, and blocks for the 
results:
      *
@@ -1784,43 +1814,59 @@ public class StorageProxy implements StorageProxyMBean
 
         AbstractReadExecutor[] reads = new AbstractReadExecutor[cmdCount];
 
+        // Get the replica locations, sorted by response time according to the 
snitch, and create a read executor
+        // for type of speculation we'll use in this read
         for (int i=0; i<cmdCount; i++)
         {
             reads[i] = AbstractReadExecutor.getReadExecutor(commands.get(i), 
consistencyLevel, queryStartNanoTime);
         }
 
+        // sends a data request to the closest replica, and a digest request 
to the others. If we have a speculating
+        // read executoe, we'll only send read requests to enough replicas to 
satisfy the consistency level
         for (int i=0; i<cmdCount; i++)
         {
             reads[i].executeAsync();
         }
 
+        // if we have a speculating read executor and it looks like we may not 
receive a response from the initial
+        // set of replicas we sent messages to, speculatively send an 
additional messages to an un-contacted replica
         for (int i=0; i<cmdCount; i++)
         {
             reads[i].maybeTryAdditionalReplicas();
         }
 
+        // wait for enough responses to meet the consistency level. If there's 
a digest mismatch, begin the read
+        // repair process by sending full data reads to all replicas we 
received responses from.
         for (int i=0; i<cmdCount; i++)
         {
             reads[i].awaitResponses();
         }
 
+        // read repair - if it looks like we may not receive enough full data 
responses to meet CL, send
+        // an additional request to any remaining replicas we haven't 
contacted (if there are any)
         for (int i=0; i<cmdCount; i++)
         {
-            reads[i].maybeRepairAdditionalReplicas();
+            reads[i].maybeSendAdditionalDataRequests();
         }
 
+        // read repair - block on full data responses
         for (int i=0; i<cmdCount; i++)
         {
             reads[i].awaitReadRepair();
         }
 
+        // if we didn't do a read repair, return the contents of the data 
response, if we did do a read
+        // repair, merge the full data reads
         List<PartitionIterator> results = new ArrayList<>(cmdCount);
+        List<ReadRepair> repairs = new ArrayList<>(cmdCount);
         for (int i=0; i<cmdCount; i++)
         {
             results.add(reads[i].getResult());
+            repairs.add(reads[i].getReadRepair());
         }
 
-        return PartitionIterators.concat(results);
+        // if we did a read repair, assemble repair mutation and block on them
+        return concatAndBlockOnRepair(results, repairs);
     }
 
     public static class LocalReadRunnable extends DroppableRunnable
@@ -2029,12 +2075,14 @@ public class StorageProxy implements StorageProxyMBean
     {
         private final DataResolver resolver;
         private final ReadCallback handler;
+        private final ReadRepair readRepair;
         private PartitionIterator result;
 
-        private SingleRangeResponse(DataResolver resolver, ReadCallback 
handler)
+        private SingleRangeResponse(DataResolver resolver, ReadCallback 
handler, ReadRepair readRepair)
         {
             this.resolver = resolver;
             this.handler = handler;
+            this.readRepair = readRepair;
         }
 
         private void waitForResponse() throws ReadTimeoutException
@@ -2165,7 +2213,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             PartitionRangeReadCommand rangeCommand = 
command.forSubRange(toQuery.range, isFirst);
 
-            ReadRepair readRepair = ReadRepair.create(command, 
toQuery.filteredEndpoints, queryStartNanoTime, consistency);
+            ReadRepair readRepair = ReadRepair.create(command, 
queryStartNanoTime, consistency);
             DataResolver resolver = new DataResolver(keyspace, rangeCommand, 
consistency, toQuery.filteredEndpoints.size(), queryStartNanoTime, readRepair);
 
             int blockFor = consistency.blockFor(keyspace);
@@ -2188,15 +2236,18 @@ public class StorageProxy implements StorageProxyMBean
                 }
             }
 
-            return new SingleRangeResponse(resolver, handler);
+            return new SingleRangeResponse(resolver, handler, readRepair);
         }
 
         private PartitionIterator sendNextRequests()
         {
             List<PartitionIterator> concurrentQueries = new 
ArrayList<>(concurrencyFactor);
+            List<ReadRepair> readRepairs = new ArrayList<>(concurrencyFactor);
             for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++)
             {
-                concurrentQueries.add(query(ranges.next(), i == 0));
+                SingleRangeResponse response = query(ranges.next(), i == 0);
+                concurrentQueries.add(response);
+                readRepairs.add(response.readRepair);
                 ++rangesQueried;
             }
 
@@ -2204,7 +2255,7 @@ public class StorageProxy implements StorageProxyMBean
             // We want to count the results for the sake of updating the 
concurrency factor (see updateConcurrencyFactor) but we don't want to
             // enforce any particular limit at this point (this could break 
code than rely on postReconciliationProcessing), hence the DataLimits.NONE.
             counter = DataLimits.NONE.newCounter(command.nowInSec(), true, 
command.selectsFullPartition(), enforceStrictLiveness);
-            return 
counter.applyTo(PartitionIterators.concat(concurrentQueries));
+            return counter.applyTo(concatAndBlockOnRepair(concurrentQueries, 
readRepairs));
         }
 
         public void close()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java 
b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index e531e0c..61b9948 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -69,17 +69,23 @@ public abstract class AbstractReadExecutor
     protected final long queryStartNanoTime;
     protected volatile PartitionIterator result = null;
 
+    protected final Keyspace keyspace;
+    protected final int blockFor;
+
     AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand 
command, ConsistencyLevel consistency, List<InetAddressAndPort> targetReplicas, 
long queryStartNanoTime)
     {
         this.command = command;
         this.consistency = consistency;
         this.targetReplicas = targetReplicas;
-        this.readRepair = ReadRepair.create(command, targetReplicas, 
queryStartNanoTime, consistency);
+        this.readRepair = ReadRepair.create(command, queryStartNanoTime, 
consistency);
         this.digestResolver = new DigestResolver(keyspace, command, 
consistency, readRepair, targetReplicas.size());
         this.handler = new ReadCallback(digestResolver, consistency, command, 
targetReplicas, queryStartNanoTime);
         this.cfs = cfs;
         this.traceState = Tracing.instance.get();
         this.queryStartNanoTime = queryStartNanoTime;
+        this.keyspace = keyspace;
+        this.blockFor = consistency.blockFor(keyspace);
+
 
         // Set the digest version (if we request some digests). This is the 
smallest version amongst all our target replicas since new nodes
         // knows how to produce older digest but the reverse is not true.
@@ -91,16 +97,16 @@ public abstract class AbstractReadExecutor
         command.setDigestVersion(digestVersion);
     }
 
-    private DecoratedKey getKey()
+    public DecoratedKey getKey()
     {
-        if (command instanceof SinglePartitionReadCommand)
-        {
-            return ((SinglePartitionReadCommand) command).partitionKey();
-        }
-        else
-        {
-            return null;
-        }
+        Preconditions.checkState(command instanceof SinglePartitionReadCommand,
+                                 "Can only get keys for 
SinglePartitionReadCommand");
+        return ((SinglePartitionReadCommand) command).partitionKey();
+    }
+
+    public ReadRepair getReadRepair()
+    {
+        return readRepair;
     }
 
     protected void makeDataRequests(Iterable<InetAddressAndPort> endpoints)
@@ -409,7 +415,7 @@ public abstract class AbstractReadExecutor
     {
         try
         {
-            readRepair.awaitRepair();
+            readRepair.awaitReads();
         }
         catch (ReadTimeoutException e)
         {
@@ -424,9 +430,17 @@ public abstract class AbstractReadExecutor
         }
     }
 
-    public void maybeRepairAdditionalReplicas()
+    boolean isDone()
     {
-        // TODO: this
+        return result != null;
+    }
+
+    public void maybeSendAdditionalDataRequests()
+    {
+        if (isDone())
+            return;
+
+        readRepair.maybeSendAdditionalReads();
     }
 
     public PartitionIterator getResult() throws ReadFailureException, 
ReadTimeoutException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
 
b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
new file mode 100644
index 0000000..eb402ba
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractFuture;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.ReadRepairMetrics;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.tracing.Tracing;
+
+public class BlockingPartitionRepair extends AbstractFuture<Object> implements 
IAsyncCallback<Object>
+{
+    private final Keyspace keyspace;
+    private final DecoratedKey key;
+    private final ConsistencyLevel consistency;
+    private final InetAddressAndPort[] participants;
+    private final ConcurrentMap<InetAddressAndPort, Mutation> pendingRepairs;
+    private final CountDownLatch latch;
+
+    private volatile long mutationsSentTime;
+
+    public BlockingPartitionRepair(Keyspace keyspace, DecoratedKey key, 
ConsistencyLevel consistency, Map<InetAddressAndPort, Mutation> repairs, int 
maxBlockFor, InetAddressAndPort[] participants)
+    {
+        this.keyspace = keyspace;
+        this.key = key;
+        this.consistency = consistency;
+        this.pendingRepairs = new ConcurrentHashMap<>(repairs);
+        this.participants = participants;
+
+        // here we remove empty repair mutations from the block for total, 
since
+        // we're not sending them mutations
+        int blockFor = maxBlockFor;
+        for (InetAddressAndPort participant: participants)
+        {
+            // remote dcs can sometimes get involved in dc-local reads. We 
want to repair
+            // them if they do, but they shouldn't interfere with blocking the 
client read.
+            if (!repairs.containsKey(participant) && 
shouldBlockOn(participant))
+                blockFor--;
+        }
+
+        // there are some cases where logically identical data can return 
different digests
+        // For read repair, this would result in ReadRepairHandler being 
called with a map of
+        // empty mutations. If we'd also speculated on either of the read 
stages, the number
+        // of empty mutations would be greater than blockFor, causing the 
latch ctor to throw
+        // an illegal argument exception due to a negative start value. So 
here we clamp it 0
+        latch = new CountDownLatch(Math.max(blockFor, 0));
+    }
+
+    @VisibleForTesting
+    long waitingOn()
+    {
+        return latch.getCount();
+    }
+
+    @VisibleForTesting
+    boolean isLocal(InetAddressAndPort endpoint)
+    {
+        return ConsistencyLevel.isLocal(endpoint);
+    }
+
+    private boolean shouldBlockOn(InetAddressAndPort endpoint)
+    {
+        return !consistency.isDatacenterLocal() || isLocal(endpoint);
+    }
+
+    @VisibleForTesting
+    void ack(InetAddressAndPort from)
+    {
+        if (shouldBlockOn(from))
+        {
+            pendingRepairs.remove(from);
+            latch.countDown();
+        }
+    }
+
+    @Override
+    public void response(MessageIn<Object> msg)
+    {
+        ack(msg.from);
+    }
+
+    @Override
+    public boolean isLatencyForSnitch()
+    {
+        return false;
+    }
+
+    private static PartitionUpdate extractUpdate(Mutation mutation)
+    {
+        return Iterables.getOnlyElement(mutation.getPartitionUpdates());
+    }
+
+    /**
+     * Combine the contents of any unacked repair into a single update
+     */
+    private PartitionUpdate mergeUnackedUpdates()
+    {
+        // recombinate the updates
+        List<PartitionUpdate> updates = 
Lists.newArrayList(Iterables.transform(pendingRepairs.values(), 
BlockingPartitionRepair::extractUpdate));
+        return updates.isEmpty() ? null : PartitionUpdate.merge(updates);
+    }
+
+    @VisibleForTesting
+    protected void sendRR(MessageOut<Mutation> message, InetAddressAndPort 
endpoint)
+    {
+        MessagingService.instance().sendRR(message, endpoint, this);
+    }
+
+    public void sendInitialRepairs()
+    {
+        mutationsSentTime = System.nanoTime();
+        for (Map.Entry<InetAddressAndPort, Mutation> entry: 
pendingRepairs.entrySet())
+        {
+            InetAddressAndPort destination = entry.getKey();
+            Mutation mutation = entry.getValue();
+            TableId tableId = extractUpdate(mutation).metadata().id;
+
+            Tracing.trace("Sending read-repair-mutation to {}", destination);
+            // use a separate verb here to avoid writing hints on timeouts
+            sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), 
destination);
+            ColumnFamilyStore.metricsFor(tableId).readRepairRequests.mark();
+
+            if (!shouldBlockOn(destination))
+                pendingRepairs.remove(destination);
+        }
+    }
+
+    public boolean awaitRepairs(long timeout, TimeUnit timeoutUnit)
+    {
+        long elapsed = System.nanoTime() - mutationsSentTime;
+        long remaining = timeoutUnit.toNanos(timeout) - elapsed;
+
+        try
+        {
+            return latch.await(remaining, TimeUnit.NANOSECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    private static int msgVersionIdx(int version)
+    {
+        return version - MessagingService.minimum_version;
+    }
+
+    /**
+     * If it looks like we might not receive acks for all the repair mutations 
we sent out, combine all
+     * the unacked mutations and send them to the minority of nodes not 
involved in the read repair data
+     * read / write cycle. We will accept acks from them in lieu of acks from 
the initial mutations sent
+     * out, so long as we receive the same number of acks as repair mutations 
transmitted. This prevents
+     * misbehaving nodes from killing a quorum read, while continuing to 
guarantee monotonic quorum reads
+     */
+    public void maybeSendAdditionalWrites(long timeout, TimeUnit timeoutUnit)
+    {
+        if (awaitRepairs(timeout, timeoutUnit))
+            return;
+
+        Set<InetAddressAndPort> exclude = Sets.newHashSet(participants);
+        Iterable<InetAddressAndPort> candidates = 
Iterables.filter(getCandidateEndpoints(), e -> !exclude.contains(e));
+        if (Iterables.isEmpty(candidates))
+            return;
+
+        PartitionUpdate update = mergeUnackedUpdates();
+        if (update == null)
+            // final response was received between speculate
+            // timeout and call to get unacked mutation.
+            return;
+
+        ReadRepairMetrics.speculatedWrite.mark();
+
+        Mutation[] versionedMutations = new 
Mutation[msgVersionIdx(MessagingService.current_version) + 1];
+
+        for (InetAddressAndPort endpoint: candidates)
+        {
+            int versionIdx = 
msgVersionIdx(MessagingService.instance().getVersion(endpoint));
+
+            Mutation mutation = versionedMutations[versionIdx];
+
+            if (mutation == null)
+            {
+                mutation = BlockingReadRepairs.createRepairMutation(update, 
consistency, endpoint, true);
+                versionedMutations[versionIdx] = mutation;
+            }
+
+            if (mutation == null)
+            {
+                // the mutation is too large to send.
+                continue;
+            }
+
+            Tracing.trace("Sending speculative read-repair-mutation to {}", 
endpoint);
+            sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), 
endpoint);
+        }
+    }
+
+    @VisibleForTesting
+    protected Iterable<InetAddressAndPort> getCandidateEndpoints()
+    {
+        return BlockingReadRepairs.getCandidateEndpoints(keyspace, 
key.getToken(), consistency);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java 
b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
index cd4d2a7..c5f1bea 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -18,20 +18,17 @@
 
 package org.apache.cassandra.service.reads.repair;
 
-import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 
-import javax.annotation.Nullable;
-
-import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,13 +40,15 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
-import org.apache.cassandra.net.AsyncOneResponse;
-import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.reads.DataResolver;
 import org.apache.cassandra.service.reads.DigestResolver;
@@ -60,17 +59,14 @@ import org.apache.cassandra.tracing.Tracing;
  * 'Classic' read repair. Doesn't allow the client read to return until
  *  updates have been written to nodes needing correction.
  */
-public class BlockingReadRepair implements ReadRepair, RepairListener
+public class BlockingReadRepair implements ReadRepair
 {
     private static final Logger logger = 
LoggerFactory.getLogger(BlockingReadRepair.class);
 
-    private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
-        Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
-
     private final ReadCommand command;
-    private final List<InetAddressAndPort> endpoints;
     private final long queryStartNanoTime;
     private final ConsistencyLevel consistency;
+    private final ColumnFamilyStore cfs;
 
     private final Queue<BlockingPartitionRepair> repairs = new 
ConcurrentLinkedQueue<>();
 
@@ -81,170 +77,149 @@ public class BlockingReadRepair implements ReadRepair, 
RepairListener
         private final DataResolver dataResolver;
         private final ReadCallback readCallback;
         private final Consumer<PartitionIterator> resultConsumer;
+        private final List<InetAddressAndPort> initialContacts;
 
-        public DigestRepair(DataResolver dataResolver, ReadCallback 
readCallback, Consumer<PartitionIterator> resultConsumer)
+        public DigestRepair(DataResolver dataResolver, ReadCallback 
readCallback, Consumer<PartitionIterator> resultConsumer, 
List<InetAddressAndPort> initialContacts)
         {
             this.dataResolver = dataResolver;
             this.readCallback = readCallback;
             this.resultConsumer = resultConsumer;
+            this.initialContacts = initialContacts;
         }
     }
 
     public BlockingReadRepair(ReadCommand command,
-                              List<InetAddressAndPort> endpoints,
                               long queryStartNanoTime,
                               ConsistencyLevel consistency)
     {
         this.command = command;
-        this.endpoints = endpoints;
         this.queryStartNanoTime = queryStartNanoTime;
         this.consistency = consistency;
+        this.cfs = Keyspace.openAndGetStore(command.metadata());
     }
 
     public UnfilteredPartitionIterators.MergeListener 
getMergeListener(InetAddressAndPort[] endpoints)
     {
-        return new PartitionIteratorMergeListener(endpoints, command, this);
+        return new PartitionIteratorMergeListener(endpoints, command, 
consistency, this);
+    }
+
+    private int getMaxResponses()
+    {
+        AbstractReplicationStrategy strategy = 
cfs.keyspace.getReplicationStrategy();
+        if (consistency.isDatacenterLocal() && strategy instanceof 
NetworkTopologyStrategy)
+        {
+            NetworkTopologyStrategy nts = (NetworkTopologyStrategy) strategy;
+            return 
nts.getReplicationFactor(DatabaseDescriptor.getLocalDataCenter());
+        }
+        else
+        {
+            return strategy.getReplicationFactor();
+        }
     }
 
-    public static class BlockingPartitionRepair extends AbstractFuture<Object> 
implements RepairListener.PartitionRepair
+    // digestResolver isn't used here because we resend read requests to all 
participants
+    public void startRepair(DigestResolver digestResolver, 
List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> 
contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
     {
+        ReadRepairMetrics.repairedBlocking.mark();
+
+        // Do a full data read to resolve the correct response (and repair 
node that need be)
+        Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
+        DataResolver resolver = new DataResolver(keyspace, command, 
ConsistencyLevel.ALL, getMaxResponses(), queryStartNanoTime, this);
+        ReadCallback readCallback = new ReadCallback(resolver, 
ConsistencyLevel.ALL, consistency.blockFor(cfs.keyspace), command,
+                                                     keyspace, allEndpoints, 
queryStartNanoTime);
 
-        final List<AsyncOneResponse<?>> responses;
-        final ReadCommand command;
-        final ConsistencyLevel consistency;
+        digestRepair = new DigestRepair(resolver, readCallback, 
resultConsumer, contactedEndpoints);
 
-        public BlockingPartitionRepair(int expectedResponses, ReadCommand 
command, ConsistencyLevel consistency)
+        for (InetAddressAndPort endpoint : contactedEndpoints)
         {
-            this.responses = new ArrayList<>(expectedResponses);
-            this.command = command;
-            this.consistency = consistency;
+            Tracing.trace("Enqueuing full data read to {}", endpoint);
+            
MessagingService.instance().sendRRWithFailure(command.createMessage(), 
endpoint, readCallback);
         }
+    }
 
-        private AsyncOneResponse sendRepairMutation(Mutation mutation, 
InetAddressAndPort destination)
-        {
-            DecoratedKey key = mutation.key();
-            Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
-            int messagingVersion = 
MessagingService.instance().getVersion(destination);
+    public void awaitReads() throws ReadTimeoutException
+    {
+        DigestRepair repair = digestRepair;
+        if (repair == null)
+            return;
 
-            int    mutationSize = (int) 
Mutation.serializer.serializedSize(mutation, messagingVersion);
-            int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
+        repair.readCallback.awaitResults();
+        repair.resultConsumer.accept(digestRepair.dataResolver.resolve());
+    }
 
-            AsyncOneResponse callback = null;
+    private boolean shouldSpeculate()
+    {
+        ConsistencyLevel speculativeCL = consistency.isDatacenterLocal() ? 
ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
+        return  consistency != ConsistencyLevel.EACH_QUORUM
+               && consistency.satisfies(speculativeCL, cfs.keyspace)
+               && cfs.sampleLatencyNanos <= 
TimeUnit.MILLISECONDS.toNanos(command.getTimeout());
+    }
 
-            if (mutationSize <= maxMutationSize)
-            {
-                Tracing.trace("Sending read-repair-mutation to {}", 
destination);
-                // use a separate verb here to avoid writing hints on timeouts
-                MessageOut<Mutation> message = 
mutation.createMessage(MessagingService.Verb.READ_REPAIR);
-                callback = MessagingService.instance().sendRR(message, 
destination);
-                
ColumnFamilyStore.metricsFor(command.metadata().id).readRepairRequests.mark();
-            }
-            else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
-            {
-                logger.debug("Encountered an oversized ({}/{}) read repair 
mutation for table {}, key {}, node {}",
-                             mutationSize,
-                             maxMutationSize,
-                             command.metadata(),
-                             
command.metadata().partitionKeyType.getString(key.getKey()),
-                             destination);
-            }
-            else
-            {
-                logger.warn("Encountered an oversized ({}/{}) read repair 
mutation for table {}, key {}, node {}",
-                            mutationSize,
-                            maxMutationSize,
-                            command.metadata(),
-                            
command.metadata().partitionKeyType.getString(key.getKey()),
-                            destination);
-
-                int blockFor = consistency.blockFor(keyspace);
-                Tracing.trace("Timed out while read-repairing after receiving 
all {} data and digest responses", blockFor);
-                throw new ReadTimeoutException(consistency, blockFor - 1, 
blockFor, true);
-            }
-            return callback;
-        }
+    public void maybeSendAdditionalReads()
+    {
+        Preconditions.checkState(command instanceof SinglePartitionReadCommand,
+                                 "maybeSendAdditionalReads can only be called 
for SinglePartitionReadCommand");
+        DigestRepair repair = digestRepair;
+        if (repair == null)
+            return;
 
-        public void reportMutation(InetAddressAndPort endpoint, Mutation 
mutation)
+        if (shouldSpeculate() && 
!repair.readCallback.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS))
         {
-            AsyncOneResponse<?> response = sendRepairMutation(mutation, 
endpoint);
+            Set<InetAddressAndPort> contacted = 
Sets.newHashSet(repair.initialContacts);
+            Token replicaToken = ((SinglePartitionReadCommand) 
command).partitionKey().getToken();
+            Iterable<InetAddressAndPort> candidates = 
BlockingReadRepairs.getCandidateEndpoints(cfs.keyspace, replicaToken, 
consistency);
+            boolean speculated = false;
+            for (InetAddressAndPort endpoint: Iterables.filter(candidates, e 
-> !contacted.contains(e)))
+            {
+                speculated = true;
+                Tracing.trace("Enqueuing speculative full data read to {}", 
endpoint);
+                MessagingService.instance().sendRR(command.createMessage(), 
endpoint, repair.readCallback);
+                break;
+            }
 
-            if (response != null)
-                responses.add(response);
+            if (speculated)
+                ReadRepairMetrics.speculatedRead.mark();
         }
+    }
 
-        public void finish()
+    @Override
+    public void maybeSendAdditionalWrites()
+    {
+        for (BlockingPartitionRepair repair: repairs)
         {
-            Futures.addCallback(Futures.allAsList(responses), new 
FutureCallback<List<Object>>()
-            {
-                public void onSuccess(@Nullable List<Object> result)
-                {
-                    set(result);
-                }
-
-                public void onFailure(Throwable t)
-                {
-                    setException(t);
-                }
-            });
+            repair.maybeSendAdditionalWrites(cfs.sampleLatencyNanos, 
TimeUnit.NANOSECONDS);
         }
     }
 
-    public void awaitRepairs(long timeout)
+    @Override
+    public void awaitWrites()
     {
-        try
+        boolean timedOut = false;
+        for (BlockingPartitionRepair repair: repairs)
         {
-            Futures.allAsList(repairs).get(timeout, TimeUnit.MILLISECONDS);
+            if (!repair.awaitRepairs(DatabaseDescriptor.getWriteRpcTimeout(), 
TimeUnit.MILLISECONDS))
+            {
+                timedOut = true;
+            }
         }
-        catch (TimeoutException ex)
+        if (timedOut)
         {
             // We got all responses, but timed out while repairing
-            Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-            int blockFor = consistency.blockFor(keyspace);
+            int blockFor = consistency.blockFor(cfs.keyspace);
             if (Tracing.isTracing())
                 Tracing.trace("Timed out while read-repairing after receiving 
all {} data and digest responses", blockFor);
             else
                 logger.debug("Timeout while read-repairing after receiving all 
{} data and digest responses", blockFor);
 
-            throw new ReadTimeoutException(consistency, blockFor - 1, 
blockFor, true);
-        }
-        catch (InterruptedException | ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public PartitionRepair startPartitionRepair()
-    {
-        BlockingPartitionRepair repair = new 
BlockingPartitionRepair(endpoints.size(), command, consistency);
-        repairs.add(repair);
-        return repair;
-    }
-
-    public void startRepair(DigestResolver digestResolver, 
List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> 
contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
-    {
-        ReadRepairMetrics.repairedBlocking.mark();
-
-        // Do a full data read to resolve the correct response (and repair 
node that need be)
-        Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-        DataResolver resolver = new DataResolver(keyspace, command, 
ConsistencyLevel.ALL, allEndpoints.size(), queryStartNanoTime, this);
-        ReadCallback readCallback = new ReadCallback(resolver, 
ConsistencyLevel.ALL, contactedEndpoints.size(), command,
-                                                     keyspace, allEndpoints, 
queryStartNanoTime);
-
-        digestRepair = new DigestRepair(resolver, readCallback, 
resultConsumer);
-
-        for (InetAddressAndPort endpoint : contactedEndpoints)
-        {
-            Tracing.trace("Enqueuing full data read to {}", endpoint);
-            
MessagingService.instance().sendRRWithFailure(command.createMessage(), 
endpoint, readCallback);
+            throw new ReadTimeoutException(consistency, blockFor-1, blockFor, 
true);
         }
     }
 
-    public void awaitRepair() throws ReadTimeoutException
+    @Override
+    public void repairPartition(DecoratedKey key, Map<InetAddressAndPort, 
Mutation> mutations, InetAddressAndPort[] destinations)
     {
-        if (digestRepair != null)
-        {
-            digestRepair.readCallback.awaitResults();
-            
digestRepair.resultConsumer.accept(digestRepair.dataResolver.resolve());
-        }
+        BlockingPartitionRepair blockingRepair = new 
BlockingPartitionRepair(cfs.keyspace, key, consistency, mutations, 
consistency.blockFor(cfs.keyspace), destinations);
+        blockingRepair.sendInitialRepairs();
+        repairs.add(blockingRepair);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java 
b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
new file mode 100644
index 0000000..e5f7179
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.List;
+
+import com.google.common.collect.Iterables;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.tracing.Tracing;
+
+public class BlockingReadRepairs
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(BlockingReadRepairs.class);
+
+    private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
+        Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
+
+    /**
+     * Returns all of the endpoints that are replicas for the given key. If 
the consistency level is datacenter
+     * local, only the endpoints in the local dc will be returned.
+     */
+    static Iterable<InetAddressAndPort> getCandidateEndpoints(Keyspace 
keyspace, Token token, ConsistencyLevel consistency)
+    {
+        List<InetAddressAndPort> endpoints = 
StorageProxy.getLiveSortedEndpoints(keyspace, token);
+        return consistency.isDatacenterLocal() && 
keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy
+               ? Iterables.filter(endpoints, ConsistencyLevel::isLocal)
+               : endpoints;
+    }
+
+    /**
+     * Create a read repair mutation from the given update, if the mutation is 
not larger than the maximum
+     * mutation size, otherwise return null. Or, if we're configured to be 
strict, throw an exception.
+     */
+    public static Mutation createRepairMutation(PartitionUpdate update, 
ConsistencyLevel consistency, InetAddressAndPort destination, boolean 
suppressException)
+    {
+        if (update == null)
+            return null;
+
+        DecoratedKey key = update.partitionKey();
+        Mutation mutation = new Mutation(update);
+        Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+        TableMetadata metadata = update.metadata();
+
+        int messagingVersion = 
MessagingService.instance().getVersion(destination);
+
+        int    mutationSize = (int) 
Mutation.serializer.serializedSize(mutation, messagingVersion);
+        int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
+
+
+        if (mutationSize <= maxMutationSize)
+        {
+            return mutation;
+        }
+        else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
+        {
+            logger.debug("Encountered an oversized ({}/{}) read repair 
mutation for table {}, key {}, node {}",
+                         mutationSize,
+                         maxMutationSize,
+                         metadata,
+                         metadata.partitionKeyType.getString(key.getKey()),
+                         destination);
+            return null;
+        }
+        else
+        {
+            logger.warn("Encountered an oversized ({}/{}) read repair mutation 
for table {}, key {}, node {}",
+                        mutationSize,
+                        maxMutationSize,
+                        metadata,
+                        metadata.partitionKeyType.getString(key.getKey()),
+                        destination);
+
+            if (!suppressException)
+            {
+                int blockFor = consistency.blockFor(keyspace);
+                Tracing.trace("Timed out while read-repairing after receiving 
all {} data and digest responses", blockFor);
+                throw new ReadTimeoutException(consistency, blockFor - 1, 
blockFor, true);
+            }
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java 
b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
index 4436f3a..6e161a8 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
@@ -19,8 +19,11 @@
 package org.apache.cassandra.service.reads.repair;
 
 import java.util.List;
+import java.util.Map;
 import java.util.function.Consumer;
 
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
@@ -43,7 +46,31 @@ public class NoopReadRepair implements ReadRepair
         resultConsumer.accept(digestResolver.getData());
     }
 
-    public void awaitRepair() throws ReadTimeoutException
+    public void awaitReads() throws ReadTimeoutException
     {
     }
+
+    @Override
+    public void maybeSendAdditionalReads()
+    {
+
+    }
+
+    @Override
+    public void maybeSendAdditionalWrites()
+    {
+
+    }
+
+    @Override
+    public void awaitWrites()
+    {
+
+    }
+
+    @Override
+    public void repairPartition(DecoratedKey key, Map<InetAddressAndPort, 
Mutation> mutations, InetAddressAndPort[] destinations)
+    {
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
 
b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
index 4ccdcbf..6cf761a 100644
--- 
a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.service.reads.repair;
 
 import java.util.List;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.RegularAndStaticColumns;
@@ -34,18 +34,20 @@ public class PartitionIteratorMergeListener implements 
UnfilteredPartitionIterat
 {
     private final InetAddressAndPort[] sources;
     private final ReadCommand command;
-    private final RepairListener repairListener;
+    private final ConsistencyLevel consistency;
+    private final ReadRepair readRepair;
 
-    public PartitionIteratorMergeListener(InetAddressAndPort[] sources, 
ReadCommand command, RepairListener repairListener)
+    public PartitionIteratorMergeListener(InetAddressAndPort[] sources, 
ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair)
     {
         this.sources = sources;
         this.command = command;
-        this.repairListener = repairListener;
+        this.consistency = consistency;
+        this.readRepair = readRepair;
     }
 
     public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
     {
-        return new RowIteratorMergeListener(partitionKey, columns(versions), 
isReversed(versions), sources, command, repairListener);
+        return new RowIteratorMergeListener(partitionKey, columns(versions), 
isReversed(versions), sources, command, consistency, readRepair);
     }
 
     private RegularAndStaticColumns columns(List<UnfilteredRowIterator> 
versions)
@@ -81,7 +83,6 @@ public class PartitionIteratorMergeListener implements 
UnfilteredPartitionIterat
 
     public void close()
     {
-        repairListener.awaitRepairs(DatabaseDescriptor.getWriteRpcTimeout());
     }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java 
b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
index 289875d..a1a9546 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
@@ -18,9 +18,12 @@
 package org.apache.cassandra.service.reads.repair;
 
 import java.util.List;
+import java.util.Map;
 import java.util.function.Consumer;
 
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
@@ -38,6 +41,10 @@ public interface ReadRepair
     /**
      * Called when the digests from the initial read don't match. Reads may 
block on the
      * repair started by this method.
+     * @param digestResolver supplied so we can get the original data response
+     * @param allEndpoints all available replicas for this read
+     * @param contactedEndpoints the replicas we actually sent requests to
+     * @param resultConsumer hook for the repair to set it's result on 
completion
      */
     public void startRepair(DigestResolver digestResolver,
                             List<InetAddressAndPort> allEndpoints,
@@ -45,12 +52,41 @@ public interface ReadRepair
                             Consumer<PartitionIterator> resultConsumer);
 
     /**
-     * Wait for any operations started by {@link ReadRepair#startRepair} to 
complete
+     * Block on the reads (or timeout) sent out in {@link 
ReadRepair#startRepair}
      */
-    public void awaitRepair() throws ReadTimeoutException;
+    public void awaitReads() throws ReadTimeoutException;
 
-    static ReadRepair create(ReadCommand command, List<InetAddressAndPort> 
endpoints, long queryStartNanoTime, ConsistencyLevel consistency)
+    /**
+     * if it looks like we might not receive data requests from everyone in 
time, send additional requests
+     * to additional replicas not contacted in the initial full data read. If 
the collection of nodes that
+     * end up responding in time end up agreeing on the data, and we don't 
consider the response from the
+     * disagreeing replica that triggered the read repair, that's ok, since 
the disagreeing data would not
+     * have been successfully written and won't be included in the response 
the the client, preserving the
+     * expectation of monotonic quorum reads
+     */
+    public void maybeSendAdditionalReads();
+
+    /**
+     * If it looks like we might not receive acks for all the repair mutations 
we sent out, combine all
+     * the unacked mutations and send them to the minority of nodes not 
involved in the read repair data
+     * read / write cycle. We will accept acks from them in lieu of acks from 
the initial mutations sent
+     * out, so long as we receive the same number of acks as repair mutations 
transmitted. This prevents
+     * misbehaving nodes from killing a quorum read, while continuing to 
guarantee monotonic quorum reads
+     */
+    public void maybeSendAdditionalWrites();
+
+    /**
+     * Hook for the merge listener to start repairs on individual partitions.
+     */
+    void repairPartition(DecoratedKey key, Map<InetAddressAndPort, Mutation> 
mutations, InetAddressAndPort[] destinations);
+
+    /**
+     * Block on any mutations (or timeout) we sent out to repair replicas in 
{@link ReadRepair#repairPartition}
+     */
+    public void awaitWrites();
+
+    static ReadRepair create(ReadCommand command, long queryStartNanoTime, 
ConsistencyLevel consistency)
     {
-        return new BlockingReadRepair(command, endpoints, queryStartNanoTime, 
consistency);
+        return new BlockingReadRepair(command, queryStartNanoTime, 
consistency);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java 
b/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java
deleted file mode 100644
index 174c0e7..0000000
--- a/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service.reads.repair;
-
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.locator.InetAddressAndPort;
-
-public interface RepairListener
-{
-    interface PartitionRepair
-    {
-        void reportMutation(InetAddressAndPort endpoint, Mutation mutation);
-        void finish();
-    }
-
-    PartitionRepair startPartitionRepair();
-    void awaitRepairs(long timeoutMillis);
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
 
b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
index f11d264..cb6707d 100644
--- 
a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -19,9 +19,13 @@
 package org.apache.cassandra.service.reads.repair;
 
 import java.util.Arrays;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
 
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.LivenessInfo;
@@ -49,6 +53,7 @@ public class RowIteratorMergeListener implements 
UnfilteredRowIterators.MergeLis
     private final boolean isReversed;
     private final InetAddressAndPort[] sources;
     private final ReadCommand command;
+    private final ConsistencyLevel consistency;
 
     private final PartitionUpdate.Builder[] repairs;
 
@@ -64,9 +69,9 @@ public class RowIteratorMergeListener implements 
UnfilteredRowIterators.MergeLis
     // For each source, record if there is an open range to send as repair, 
and from where.
     private final ClusteringBound[] markerToRepair;
 
-    private final RepairListener repairListener;
+    private final ReadRepair readRepair;
 
-    public RowIteratorMergeListener(DecoratedKey partitionKey, 
RegularAndStaticColumns columns, boolean isReversed, InetAddressAndPort[] 
sources, ReadCommand command, RepairListener repairListener)
+    public RowIteratorMergeListener(DecoratedKey partitionKey, 
RegularAndStaticColumns columns, boolean isReversed, InetAddressAndPort[] 
sources, ReadCommand command, ConsistencyLevel consistency, ReadRepair 
readRepair)
     {
         this.partitionKey = partitionKey;
         this.columns = columns;
@@ -77,7 +82,8 @@ public class RowIteratorMergeListener implements 
UnfilteredRowIterators.MergeLis
         sourceDeletionTime = new DeletionTime[sources.length];
         markerToRepair = new ClusteringBound[sources.length];
         this.command = command;
-        this.repairListener = repairListener;
+        this.consistency = consistency;
+        this.readRepair = readRepair;
 
         this.diffListener = new RowDiffListener()
         {
@@ -300,22 +306,25 @@ public class RowIteratorMergeListener implements 
UnfilteredRowIterators.MergeLis
 
     public void close()
     {
-        RepairListener.PartitionRepair repair = null;
+        Map<InetAddressAndPort, Mutation> mutations = null;
         for (int i = 0; i < repairs.length; i++)
         {
             if (repairs[i] == null)
                 continue;
 
-            if (repair == null)
-            {
-                repair = repairListener.startPartitionRepair();
-            }
-            repair.reportMutation(sources[i], new 
Mutation(repairs[i].build()));
+            Mutation mutation = 
BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, 
sources[i], false);
+            if (mutation == null)
+                continue;
+
+            if (mutations == null)
+                mutations = Maps.newHashMapWithExpectedSize(sources.length);
+
+            mutations.put(sources[i], mutation);
         }
 
-        if (repair != null)
+        if (mutations != null)
         {
-            repair.finish();
+            readRepair.repairPartition(partitionKey, mutations, sources);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java 
b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index 4d4d398..1a5aa7a 100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -22,16 +22,12 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.junit.*;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.service.reads.DataResolver;
-import org.apache.cassandra.service.reads.repair.BlockingReadRepair;
-import org.apache.cassandra.service.reads.repair.NoopReadRepair;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -47,8 +43,6 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.service.reads.repair.ReadRepair;
-import org.apache.cassandra.service.reads.repair.RepairListener;
 import org.apache.cassandra.service.reads.repair.TestableReadRepair;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -118,7 +112,7 @@ public class DataResolverTest
 
         nowInSec = FBUtilities.nowInSeconds();
         command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
-        readRepair = new TestableReadRepair(command);
+        readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM);
     }
 
     @Test
@@ -346,7 +340,7 @@ public class DataResolverTest
     @Test
     public void testResolveWithBothEmpty()
     {
-        TestableReadRepair readRepair = new TestableReadRepair(command);
+        TestableReadRepair readRepair = new TestableReadRepair(command, 
ConsistencyLevel.QUORUM);
         DataResolver resolver = new DataResolver(ks, command, 
ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
         resolver.preprocess(readResponseMessage(peer(), 
EmptyIterators.unfilteredPartition(cfm)));
         resolver.preprocess(readResponseMessage(peer(), 
EmptyIterators.unfilteredPartition(cfm)));
@@ -715,7 +709,7 @@ public class DataResolverTest
     public void testResolveComplexDelete()
     {
         ReadCommand cmd = Util.cmd(cfs2, 
dk).withNowInSeconds(nowInSec).build();
-        TestableReadRepair readRepair = new TestableReadRepair(cmd);
+        TestableReadRepair readRepair = new TestableReadRepair(cmd, 
ConsistencyLevel.QUORUM);
         DataResolver resolver = new DataResolver(ks, cmd, 
ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
 
         long[] ts = {100, 200};
@@ -767,7 +761,7 @@ public class DataResolverTest
     public void testResolveDeletedCollection()
     {
         ReadCommand cmd = Util.cmd(cfs2, 
dk).withNowInSeconds(nowInSec).build();
-        TestableReadRepair readRepair = new TestableReadRepair(cmd);
+        TestableReadRepair readRepair = new TestableReadRepair(cmd, 
ConsistencyLevel.QUORUM);
         DataResolver resolver = new DataResolver(ks, cmd, 
ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
 
         long[] ts = {100, 200};
@@ -810,7 +804,7 @@ public class DataResolverTest
     public void testResolveNewCollection()
     {
         ReadCommand cmd = Util.cmd(cfs2, 
dk).withNowInSeconds(nowInSec).build();
-        TestableReadRepair readRepair = new TestableReadRepair(cmd);
+        TestableReadRepair readRepair = new TestableReadRepair(cmd, 
ConsistencyLevel.QUORUM);
         DataResolver resolver = new DataResolver(ks, cmd, 
ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
 
         long[] ts = {100, 200};
@@ -859,7 +853,7 @@ public class DataResolverTest
     public void testResolveNewCollectionOverwritingDeleted()
     {
         ReadCommand cmd = Util.cmd(cfs2, 
dk).withNowInSeconds(nowInSec).build();
-        TestableReadRepair readRepair = new TestableReadRepair(cmd);
+        TestableReadRepair readRepair = new TestableReadRepair(cmd, 
ConsistencyLevel.QUORUM);
         DataResolver resolver = new DataResolver(ks, cmd, 
ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
 
         long[] ts = {100, 200};

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java 
b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
new file mode 100644
index 0000000..75d6e83
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.MigrationManager;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Tables;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class ReadRepairTest
+{
+    static Keyspace ks;
+    static ColumnFamilyStore cfs;
+    static TableMetadata cfm;
+    static InetAddressAndPort target1;
+    static InetAddressAndPort target2;
+    static InetAddressAndPort target3;
+    static List<InetAddressAndPort> targets;
+
+    private static class InstrumentedReadRepairHandler extends 
BlockingPartitionRepair
+    {
+        public InstrumentedReadRepairHandler(Keyspace keyspace, DecoratedKey 
key, ConsistencyLevel consistency, Map<InetAddressAndPort, Mutation> repairs, 
int maxBlockFor, InetAddressAndPort[] participants)
+        {
+            super(keyspace, key, consistency, repairs, maxBlockFor, 
participants);
+        }
+
+        Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
+
+        protected void sendRR(MessageOut<Mutation> message, InetAddressAndPort 
endpoint)
+        {
+            mutationsSent.put(endpoint, message.payload);
+        }
+
+        List<InetAddressAndPort> candidates = targets;
+
+        protected List<InetAddressAndPort> getCandidateEndpoints()
+        {
+            return candidates;
+        }
+
+        @Override
+        protected boolean isLocal(InetAddressAndPort endpoint)
+        {
+            return targets.contains(endpoint);
+        }
+    }
+
+    static long now = TimeUnit.NANOSECONDS.toMicros(System.nanoTime());
+    static DecoratedKey key;
+    static Cell cell1;
+    static Cell cell2;
+    static Cell cell3;
+    static Mutation resolved;
+
+    private static void assertRowsEqual(Row expected, Row actual)
+    {
+        try
+        {
+            Assert.assertEquals(expected == null, actual == null);
+            if (expected == null)
+                return;
+            Assert.assertEquals(expected.clustering(), actual.clustering());
+            Assert.assertEquals(expected.deletion(), actual.deletion());
+            Assert.assertArrayEquals(Iterables.toArray(expected.cells(), 
Cell.class), Iterables.toArray(expected.cells(), Cell.class));
+        } catch (Throwable t)
+        {
+            throw new AssertionError(String.format("Row comparison failed, 
expected %s got %s", expected, actual), t);
+        }
+    }
+
+    @BeforeClass
+    public static void setUpClass() throws Throwable
+    {
+        SchemaLoader.loadSchema();
+        String ksName = "ks";
+
+        cfm = CreateTableStatement.parse("CREATE TABLE tbl (k int primary key, 
v text)", ksName).build();
+        KeyspaceMetadata ksm = KeyspaceMetadata.create(ksName, 
KeyspaceParams.simple(3), Tables.of(cfm));
+        MigrationManager.announceNewKeyspace(ksm, false);
+
+        ks = Keyspace.open(ksName);
+        cfs = ks.getColumnFamilyStore("tbl");
+
+        cfs.sampleLatencyNanos = 0;
+
+        target1 = InetAddressAndPort.getByName("127.0.0.255");
+        target2 = InetAddressAndPort.getByName("127.0.0.254");
+        target3 = InetAddressAndPort.getByName("127.0.0.253");
+
+        targets = ImmutableList.of(target1, target2, target3);
+
+        // default test values
+        key  = dk(5);
+        cell1 = cell("v", "val1", now);
+        cell2 = cell("v", "val2", now);
+        cell3 = cell("v", "val3", now);
+        resolved = mutation(cell1, cell2);
+    }
+
+    private static DecoratedKey dk(int v)
+    {
+        return 
DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(v));
+    }
+
+    private static Cell cell(String name, String value, long timestamp)
+    {
+        return 
BufferCell.live(cfm.getColumn(ColumnIdentifier.getInterned(name, false)), 
timestamp, ByteBufferUtil.bytes(value));
+    }
+
+    private static Mutation mutation(Cell... cells)
+    {
+        Row.Builder builder = BTreeRow.unsortedBuilder(0);
+        builder.newRow(Clustering.EMPTY);
+        for (Cell cell: cells)
+        {
+            builder.addCell(cell);
+        }
+        return new Mutation(PartitionUpdate.singleRowUpdate(cfm, key, 
builder.build()));
+    }
+
+    private static InstrumentedReadRepairHandler 
createRepairHandler(Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor, 
Collection<InetAddressAndPort> participants)
+    {
+        InetAddressAndPort[] participantArray = new 
InetAddressAndPort[participants.size()];
+        participants.toArray(participantArray);
+        return new InstrumentedReadRepairHandler(ks, key, 
ConsistencyLevel.LOCAL_QUORUM, repairs, maxBlockFor, participantArray);
+    }
+
+    private static InstrumentedReadRepairHandler 
createRepairHandler(Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor)
+    {
+        return createRepairHandler(repairs, maxBlockFor, repairs.keySet());
+    }
+
+    @Test
+    public void consistencyLevelTest() throws Exception
+    {
+        
Assert.assertTrue(ConsistencyLevel.QUORUM.satisfies(ConsistencyLevel.QUORUM, 
ks));
+        
Assert.assertTrue(ConsistencyLevel.THREE.satisfies(ConsistencyLevel.QUORUM, 
ks));
+        
Assert.assertTrue(ConsistencyLevel.TWO.satisfies(ConsistencyLevel.QUORUM, ks));
+        
Assert.assertFalse(ConsistencyLevel.ONE.satisfies(ConsistencyLevel.QUORUM, ks));
+        
Assert.assertFalse(ConsistencyLevel.ANY.satisfies(ConsistencyLevel.QUORUM, ks));
+    }
+
+    private static void assertMutationEqual(Mutation expected, Mutation actual)
+    {
+        Assert.assertEquals(expected.getKeyspaceName(), 
actual.getKeyspaceName());
+        Assert.assertEquals(expected.key(), actual.key());
+        Assert.assertEquals(expected.key(), actual.key());
+        PartitionUpdate expectedUpdate = 
Iterables.getOnlyElement(expected.getPartitionUpdates());
+        PartitionUpdate actualUpdate = 
Iterables.getOnlyElement(actual.getPartitionUpdates());
+        assertRowsEqual(Iterables.getOnlyElement(expectedUpdate), 
Iterables.getOnlyElement(actualUpdate));
+    }
+
+    @Test
+    public void additionalMutationRequired() throws Exception
+    {
+
+        Mutation repair1 = mutation(cell2);
+        Mutation repair2 = mutation(cell1);
+
+        // check that the correct repairs are calculated
+        Map<InetAddressAndPort, Mutation> repairs = new HashMap<>();
+        repairs.put(target1, repair1);
+        repairs.put(target2, repair2);
+
+
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2);
+
+        Assert.assertTrue(handler.mutationsSent.isEmpty());
+
+        // check that the correct mutations are sent
+        handler.sendInitialRepairs();
+        Assert.assertEquals(2, handler.mutationsSent.size());
+        assertMutationEqual(repair1, handler.mutationsSent.get(target1));
+        assertMutationEqual(repair2, handler.mutationsSent.get(target2));
+
+        // check that a combined mutation is speculatively sent to the 3rd 
target
+        handler.mutationsSent.clear();
+        handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS);
+        Assert.assertEquals(1, handler.mutationsSent.size());
+        assertMutationEqual(resolved, handler.mutationsSent.get(target3));
+
+        // check repairs stop blocking after receiving 2 acks
+        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        handler.ack(target1);
+        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        handler.ack(target3);
+        Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+
+    }
+
+    /**
+     * If we've received enough acks, we shouldn't send any additional 
mutations
+     */
+    @Test
+    public void noAdditionalMutationRequired() throws Exception
+    {
+        Map<InetAddressAndPort, Mutation> repairs = new HashMap<>();
+        repairs.put(target1, mutation(cell2));
+        repairs.put(target2, mutation(cell1));
+
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2);
+        handler.sendInitialRepairs();
+        handler.ack(target1);
+        handler.ack(target2);
+
+        // both replicas have acked, we shouldn't send anything else out
+        handler.mutationsSent.clear();
+        handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS);
+        Assert.assertTrue(handler.mutationsSent.isEmpty());
+    }
+
+    /**
+     * If there are no additional nodes we can send mutations to, we... 
shouldn't
+     */
+    @Test
+    public void noAdditionalMutationPossible() throws Exception
+    {
+        Map<InetAddressAndPort, Mutation> repairs = new HashMap<>();
+        repairs.put(target1, mutation(cell2));
+        repairs.put(target2, mutation(cell1));
+
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2);
+        handler.sendInitialRepairs();
+
+        // we've already sent mutations to all candidates, so we shouldn't 
send any more
+        handler.candidates = Lists.newArrayList(target1, target2);
+        handler.mutationsSent.clear();
+        handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS);
+        Assert.assertTrue(handler.mutationsSent.isEmpty());
+    }
+
+    /**
+     * If we didn't send a repair to a replica because there wasn't a diff 
with the
+     * resolved column family, we shouldn't send it a speculative mutation
+     */
+    @Test
+    public void mutationsArentSentToInSyncNodes() throws Exception
+    {
+        Mutation repair1 = mutation(cell2);
+
+        Map<InetAddressAndPort, Mutation> repairs = new HashMap<>();
+        repairs.put(target1, repair1);
+        Collection<InetAddressAndPort> participants = 
Lists.newArrayList(target1, target2);
+
+        // check that the correct initial mutations are sent out
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2, participants);
+        handler.sendInitialRepairs();
+        Assert.assertEquals(1, handler.mutationsSent.size());
+        Assert.assertTrue(handler.mutationsSent.containsKey(target1));
+
+        // check that speculative mutations aren't sent to target2
+        handler.mutationsSent.clear();
+        handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS);
+        Assert.assertEquals(1, handler.mutationsSent.size());
+        Assert.assertTrue(handler.mutationsSent.containsKey(target3));
+    }
+
+    @Test
+    public void onlyBlockOnQuorum()
+    {
+        Map<InetAddressAndPort, Mutation> repairs = new HashMap<>();
+        repairs.put(target1, mutation(cell1));
+        repairs.put(target2, mutation(cell2));
+        repairs.put(target3, mutation(cell3));
+        Assert.assertEquals(3, repairs.size());
+
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2);
+        handler.sendInitialRepairs();
+
+        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        handler.ack(target1);
+        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+
+        // here we should stop blocking, even though we've sent 3 repairs
+        handler.ack(target2);
+        Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+
+    }
+
+    /**
+     * For dc local consistency levels, noop mutations and responses from 
remote dcs should not affect effective blockFor
+     */
+    @Test
+    public void remoteDCTest() throws Exception
+    {
+        Map<InetAddressAndPort, Mutation> repairs = new HashMap<>();
+        repairs.put(target1, mutation(cell1));
+
+
+        InetAddressAndPort remote1 = InetAddressAndPort.getByName("10.0.0.1");
+        InetAddressAndPort remote2 = InetAddressAndPort.getByName("10.0.0.2");
+        repairs.put(remote1, mutation(cell1));
+
+        Collection<InetAddressAndPort> participants = 
Lists.newArrayList(target1, target2, remote1, remote2);
+
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2, participants);
+        handler.sendInitialRepairs();
+        Assert.assertEquals(2, handler.mutationsSent.size());
+        Assert.assertTrue(handler.mutationsSent.containsKey(target1));
+        Assert.assertTrue(handler.mutationsSent.containsKey(remote1));
+
+        Assert.assertEquals(1, handler.waitingOn());
+        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+
+        handler.ack(remote1);
+        Assert.assertEquals(1, handler.waitingOn());
+        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+
+        handler.ack(target1);
+        Assert.assertEquals(0, handler.waitingOn());
+        Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java 
b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
index 5664c9b..f97980b 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
 
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.partitions.PartitionIterator;
@@ -31,60 +33,59 @@ import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.service.reads.DigestResolver;
 
-public class TestableReadRepair implements ReadRepair, RepairListener
+public class TestableReadRepair implements ReadRepair
 {
     public final Map<InetAddressAndPort, Mutation> sent = new HashMap<>();
 
     private final ReadCommand command;
+    private final ConsistencyLevel consistency;
 
-    public TestableReadRepair(ReadCommand command)
+    public TestableReadRepair(ReadCommand command, ConsistencyLevel 
consistency)
     {
         this.command = command;
+        this.consistency = consistency;
     }
 
-    private class TestablePartitionRepair implements 
RepairListener.PartitionRepair
+    @Override
+    public UnfilteredPartitionIterators.MergeListener 
getMergeListener(InetAddressAndPort[] endpoints)
     {
-        @Override
-        public void reportMutation(InetAddressAndPort endpoint, Mutation 
mutation)
-        {
-            sent.put(endpoint, mutation);
-        }
+        return new PartitionIteratorMergeListener(endpoints, command, 
consistency, this);
+    }
 
-        @Override
-        public void finish()
-        {
+    @Override
+    public void startRepair(DigestResolver digestResolver, 
List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> 
contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
+    {
 
-        }
     }
 
     @Override
-    public UnfilteredPartitionIterators.MergeListener 
getMergeListener(InetAddressAndPort[] endpoints)
+    public void awaitReads() throws ReadTimeoutException
     {
-        return new PartitionIteratorMergeListener(endpoints, command, this);
+
     }
 
     @Override
-    public void startRepair(DigestResolver digestResolver, 
List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> 
contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
+    public void maybeSendAdditionalReads()
     {
 
     }
 
     @Override
-    public void awaitRepair() throws ReadTimeoutException
+    public void maybeSendAdditionalWrites()
     {
 
     }
 
     @Override
-    public PartitionRepair startPartitionRepair()
+    public void awaitWrites()
     {
-        return new TestablePartitionRepair();
+
     }
 
     @Override
-    public void awaitRepairs(long timeoutMillis)
+    public void repairPartition(DecoratedKey key, Map<InetAddressAndPort, 
Mutation> mutations, InetAddressAndPort[] destinations)
     {
-
+        sent.putAll(mutations);
     }
 
     public Mutation getForEndpoint(InetAddressAndPort endpoint)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to