Refactor read executor and response resolver, abstract read repair

Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-14058


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/39807ba4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/39807ba4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/39807ba4

Branch: refs/heads/trunk
Commit: 39807ba48ed2e02223014fbf47dce21d4124b380
Parents: 4a50f44
Author: Blake Eggleston <bdeggles...@gmail.com>
Authored: Tue Nov 14 17:38:10 2017 -0800
Committer: Blake Eggleston <bdeggles...@gmail.com>
Committed: Thu Mar 1 17:52:54 2018 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |    1 +
 .../apache/cassandra/db/ConsistencyLevel.java   |    2 +-
 .../UnfilteredPartitionIterators.java           |   10 +
 .../db/rows/UnfilteredRowIterators.java         |   11 +
 .../apache/cassandra/net/AsyncOneResponse.java  |   71 +-
 .../cassandra/service/AbstractReadExecutor.java |  412 -------
 .../cassandra/service/AsyncRepairCallback.java  |   60 -
 .../apache/cassandra/service/DataResolver.java  |  838 -------------
 .../service/DigestMismatchException.java        |   35 -
 .../cassandra/service/DigestResolver.java       |  103 --
 .../apache/cassandra/service/ReadCallback.java  |  266 -----
 .../cassandra/service/ReadRepairDecision.java   |   23 -
 .../cassandra/service/ResponseResolver.java     |   73 --
 .../apache/cassandra/service/StorageProxy.java  |  148 +--
 .../service/reads/AbstractReadExecutor.java     |  482 ++++++++
 .../service/reads/AsyncRepairCallback.java      |   61 +
 .../cassandra/service/reads/DataResolver.java   |  133 +++
 .../cassandra/service/reads/DigestResolver.java |   93 ++
 .../cassandra/service/reads/ReadCallback.java   |  213 ++++
 .../service/reads/ReadRepairDecision.java       |   23 +
 .../service/reads/ResponseResolver.java         |   66 ++
 .../reads/ShortReadPartitionsProtection.java    |  187 +++
 .../service/reads/ShortReadProtection.java      |   74 ++
 .../service/reads/ShortReadRowsProtection.java  |  197 +++
 .../reads/repair/BlockingReadRepair.java        |  234 ++++
 .../service/reads/repair/NoopReadRepair.java    |   63 +
 .../repair/PartitionIteratorMergeListener.java  |   92 ++
 .../service/reads/repair/ReadRepair.java        |   72 ++
 .../service/reads/repair/RepairListener.java    |   34 +
 .../reads/repair/RowIteratorMergeListener.java  |  336 ++++++
 .../cassandra/service/DataResolverTest.java     | 1122 ------------------
 .../cassandra/service/ReadExecutorTest.java     |  215 ----
 .../service/reads/DataResolverTest.java         | 1057 +++++++++++++++++
 .../service/reads/ReadExecutorTest.java         |  216 ++++
 .../reads/repair/TestableReadRepair.java        |  109 ++
 35 files changed, 3826 insertions(+), 3306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9e7a599..a6c0323 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Refactor read executor and response resolver, abstract read repair 
(CASSANDRA-14058)
  * Add optional startup delay to wait until peers are ready (CASSANDRA-13993)
  * Add a few options to nodetool verify (CASSANDRA-14201)
  * CVE-2017-5929 Security vulnerability and redefine default log rotation 
policy (CASSANDRA-14183)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java 
b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index f93e737..19f2e10 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.service.ReadRepairDecision;
+import org.apache.cassandra.service.reads.ReadRepairDecision;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index dff6dae..edb7833 100644
--- 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -49,6 +49,16 @@ public abstract class UnfilteredPartitionIterators
     {
         public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions);
         public void close();
+
+        public static MergeListener NOOP = new MergeListener()
+        {
+            public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
+            {
+                return UnfilteredRowIterators.MergeListener.NOOP;
+            }
+
+            public void close() {}
+        };
     }
 
     @SuppressWarnings("resource") // The created resources are returned right 
away

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index e1c6685..0244531 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -95,6 +95,17 @@ public abstract class UnfilteredRowIterators
         public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, 
RangeTombstoneMarker[] versions);
 
         public void close();
+
+        public static MergeListener NOOP = new MergeListener()
+        {
+            public void onMergedPartitionLevelDeletion(DeletionTime 
mergedDeletion, DeletionTime[] versions) {}
+
+            public void onMergedRows(Row merged, Row[] versions) {}
+
+            public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker 
merged, RangeTombstoneMarker[] versions) {}
+
+            public void close() {}
+        };
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/net/AsyncOneResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/AsyncOneResponse.java 
b/src/java/org/apache/cassandra/net/AsyncOneResponse.java
index b7ef227..4e004d2 100644
--- a/src/java/org/apache/cassandra/net/AsyncOneResponse.java
+++ b/src/java/org/apache/cassandra/net/AsyncOneResponse.java
@@ -17,67 +17,54 @@
  */
 package org.apache.cassandra.net;
 
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.AbstractFuture;
+
 /**
  * A callback specialized for returning a value from a single target; that is, 
this is for messages
  * that we only send to one recipient.
  */
-public class AsyncOneResponse<T> implements IAsyncCallback<T>
+public class AsyncOneResponse<T> extends AbstractFuture<T> implements 
IAsyncCallback<T>
 {
-    private T result;
-    private boolean done;
     private final long start = System.nanoTime();
 
-    public T get(long timeout, TimeUnit tu) throws TimeoutException
+    public void response(MessageIn<T> response)
     {
-        timeout = tu.toNanos(timeout);
-        boolean interrupted = false;
-        try
+        set(response.payload);
+    }
+
+    public boolean isLatencyForSnitch()
+    {
+        return false;
+    }
+
+    @Override
+    public T get(long timeout, TimeUnit unit) throws TimeoutException
+    {
+        long adjustedTimeout = unit.toNanos(timeout) - (System.nanoTime() - 
start);
+        if (adjustedTimeout <= 0)
         {
-            synchronized (this)
-            {
-                while (!done)
-                {
-                    try
-                    {
-                        long overallTimeout = timeout - (System.nanoTime() - 
start);
-                        if (overallTimeout <= 0)
-                        {
-                            throw new TimeoutException("Operation timed out.");
-                        }
-                        TimeUnit.NANOSECONDS.timedWait(this, overallTimeout);
-                    }
-                    catch (InterruptedException e)
-                    {
-                        interrupted = true;
-                    }
-                }
-            }
+            throw new TimeoutException("Operation timed out.");
         }
-        finally
+        try
         {
-            if (interrupted)
-            {
-                Thread.currentThread().interrupt();
-            }
+            return super.get(timeout, TimeUnit.NANOSECONDS);
         }
-        return result;
-    }
-
-    public synchronized void response(MessageIn<T> response)
-    {
-        if (!done)
+        catch (InterruptedException | ExecutionException e)
         {
-            result = response.payload;
-            done = true;
-            this.notifyAll();
+            throw new AssertionError(e);
         }
     }
 
-    public boolean isLatencyForSnitch()
+    @VisibleForTesting
+    public static <T> AsyncOneResponse<T> immediate(T value)
     {
-        return false;
+        AsyncOneResponse<T> response = new AsyncOneResponse<>();
+        response.set(value);
+        return response;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java 
b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
deleted file mode 100644
index e06131e..0000000
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.exceptions.ReadFailureException;
-import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.metrics.ReadRepairMetrics;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.SpeculativeRetryParam;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
-import org.apache.cassandra.tracing.TraceState;
-import org.apache.cassandra.tracing.Tracing;
-
-/**
- * Sends a read request to the replicas needed to satisfy a given 
ConsistencyLevel.
- *
- * Optionally, may perform additional requests to provide redundancy against 
replica failure:
- * AlwaysSpeculatingReadExecutor will always send a request to one extra 
replica, while
- * SpeculatingReadExecutor will wait until it looks like the original request 
is in danger
- * of timing out before performing extra reads.
- */
-public abstract class AbstractReadExecutor
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(AbstractReadExecutor.class);
-
-    protected final ReadCommand command;
-    protected final List<InetAddressAndPort> targetReplicas;
-    protected final ReadCallback handler;
-    protected final TraceState traceState;
-    protected final ColumnFamilyStore cfs;
-
-    AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand 
command, ConsistencyLevel consistencyLevel, List<InetAddressAndPort> 
targetReplicas, long queryStartNanoTime)
-    {
-        this.command = command;
-        this.targetReplicas = targetReplicas;
-        this.handler = new ReadCallback(new DigestResolver(keyspace, command, 
consistencyLevel, targetReplicas.size()), consistencyLevel, command, 
targetReplicas, queryStartNanoTime);
-        this.cfs = cfs;
-        this.traceState = Tracing.instance.get();
-
-        // Set the digest version (if we request some digests). This is the 
smallest version amongst all our target replicas since new nodes
-        // knows how to produce older digest but the reverse is not true.
-        // TODO: we need this when talking with pre-3.0 nodes. So if we 
preserve the digest format moving forward, we can get rid of this once
-        // we stop being compatible with pre-3.0 nodes.
-        int digestVersion = MessagingService.current_version;
-        for (InetAddressAndPort replica : targetReplicas)
-            digestVersion = Math.min(digestVersion, 
MessagingService.instance().getVersion(replica));
-        command.setDigestVersion(digestVersion);
-    }
-
-    protected void makeDataRequests(Iterable<InetAddressAndPort> endpoints)
-    {
-        makeRequests(command, endpoints);
-
-    }
-
-    protected void makeDigestRequests(Iterable<InetAddressAndPort> endpoints)
-    {
-        makeRequests(command.copyAsDigestQuery(), endpoints);
-    }
-
-    private void makeRequests(ReadCommand readCommand, 
Iterable<InetAddressAndPort> endpoints)
-    {
-        boolean hasLocalEndpoint = false;
-
-        for (InetAddressAndPort endpoint : endpoints)
-        {
-            if (StorageProxy.canDoLocalRequest(endpoint))
-            {
-                hasLocalEndpoint = true;
-                continue;
-            }
-
-            if (traceState != null)
-                traceState.trace("reading {} from {}", 
readCommand.isDigestQuery() ? "digest" : "data", endpoint);
-            logger.trace("reading {} from {}", readCommand.isDigestQuery() ? 
"digest" : "data", endpoint);
-            MessageOut<ReadCommand> message = readCommand.createMessage();
-            MessagingService.instance().sendRRWithFailure(message, endpoint, 
handler);
-        }
-
-        // We delay the local (potentially blocking) read till the end to 
avoid stalling remote requests.
-        if (hasLocalEndpoint)
-        {
-            logger.trace("reading {} locally", readCommand.isDigestQuery() ? 
"digest" : "data");
-            StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
LocalReadRunnable(command, handler));
-        }
-    }
-
-    /**
-     * Perform additional requests if it looks like the original will time 
out.  May block while it waits
-     * to see if the original requests are answered first.
-     */
-    public abstract void maybeTryAdditionalReplicas();
-
-    /**
-     * Get the replicas involved in the [finished] request.
-     *
-     * @return target replicas + the extra replica, *IF* we speculated.
-     */
-    public abstract Collection<InetAddressAndPort> getContactedReplicas();
-
-    /**
-     * send the initial set of requests
-     */
-    public abstract void executeAsync();
-
-    /**
-     * wait for an answer.  Blocks until success or timeout, so it is caller's
-     * responsibility to call maybeTryAdditionalReplicas first.
-     */
-    public PartitionIterator get() throws ReadFailureException, 
ReadTimeoutException, DigestMismatchException
-    {
-        try
-        {
-            return handler.get();
-        }
-        catch (ReadTimeoutException e)
-        {
-            try
-            {
-                onReadTimeout();
-            }
-            finally
-            {
-                throw e;
-            }
-        }
-    }
-
-    private static ReadRepairDecision newReadRepairDecision(TableMetadata 
metadata)
-    {
-        if (metadata.params.readRepairChance > 0d ||
-            metadata.params.dcLocalReadRepairChance > 0)
-        {
-            double chance = ThreadLocalRandom.current().nextDouble();
-            if (metadata.params.readRepairChance > chance)
-                return ReadRepairDecision.GLOBAL;
-
-            if (metadata.params.dcLocalReadRepairChance > chance)
-                return ReadRepairDecision.DC_LOCAL;
-        }
-
-        return ReadRepairDecision.NONE;
-    }
-
-    /**
-     * @return an executor appropriate for the configured speculative read 
policy
-     */
-    public static AbstractReadExecutor 
getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel 
consistencyLevel, long queryStartNanoTime) throws UnavailableException
-    {
-        Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-        List<InetAddressAndPort> allReplicas = 
StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
-        // 11980: Excluding EACH_QUORUM reads from potential RR, so that we do 
not miscount DC responses
-        ReadRepairDecision repairDecision = consistencyLevel == 
ConsistencyLevel.EACH_QUORUM
-                                            ? ReadRepairDecision.NONE
-                                            : 
newReadRepairDecision(command.metadata());
-        List<InetAddressAndPort> targetReplicas = 
consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
-
-        // Throw UAE early if we don't have enough replicas.
-        consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas);
-
-        if (repairDecision != ReadRepairDecision.NONE)
-        {
-            Tracing.trace("Read-repair {}", repairDecision);
-            ReadRepairMetrics.attempted.mark();
-        }
-
-        ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore(command.metadata().id);
-        SpeculativeRetryParam retry = cfs.metadata().params.speculativeRetry;
-
-        // Speculative retry is disabled *OR*
-        // 11980: Disable speculative retry if using EACH_QUORUM in order to 
prevent miscounting DC responses
-        if (retry.equals(SpeculativeRetryParam.NONE)
-            | consistencyLevel == ConsistencyLevel.EACH_QUORUM)
-            return new NeverSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime, false);
-
-        // There are simply no extra replicas to speculate.
-        // Handle this separately so it can log failed attempts to speculate 
due to lack of replicas
-        if (consistencyLevel.blockFor(keyspace) == allReplicas.size())
-            return new NeverSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime, true);
-
-        if (targetReplicas.size() == allReplicas.size())
-        {
-            // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
-            // We are going to contact every node anyway, so ask for 2 full 
data requests instead of 1, for redundancy
-            // (same amount of requests in total, but we turn 1 digest request 
into a full blown data request).
-            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime);
-        }
-
-        // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
-        InetAddressAndPort extraReplica = 
allReplicas.get(targetReplicas.size());
-        // With repair decision DC_LOCAL all replicas/target replicas may be 
in different order, so
-        // we might have to find a replacement that's not already in 
targetReplicas.
-        if (repairDecision == ReadRepairDecision.DC_LOCAL && 
targetReplicas.contains(extraReplica))
-        {
-            for (InetAddressAndPort address : allReplicas)
-            {
-                if (!targetReplicas.contains(address))
-                {
-                    extraReplica = address;
-                    break;
-                }
-            }
-        }
-        targetReplicas.add(extraReplica);
-
-        if (retry.equals(SpeculativeRetryParam.ALWAYS))
-            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime);
-        else // PERCENTILE or CUSTOM.
-            return new SpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime);
-    }
-
-    /**
-     *  Returns true if speculation should occur and if it should then block 
until it is time to
-     *  send the speculative reads
-     */
-    boolean shouldSpeculateAndMaybeWait()
-    {
-        // no latency information, or we're overloaded
-        if (cfs.sampleLatencyNanos > 
TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
-            return false;
-
-        return !handler.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS);
-    }
-
-    void onReadTimeout() {}
-
-    public static class NeverSpeculatingReadExecutor extends 
AbstractReadExecutor
-    {
-        /**
-         * If never speculating due to lack of replicas
-         * log it is as a failure if it should have happened
-         * but couldn't due to lack of replicas
-         */
-        private final boolean logFailedSpeculation;
-
-        public NeverSpeculatingReadExecutor(Keyspace keyspace, 
ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, 
List<InetAddressAndPort> targetReplicas, long queryStartNanoTime, boolean 
logFailedSpeculation)
-        {
-            super(keyspace, cfs, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
-            this.logFailedSpeculation = logFailedSpeculation;
-        }
-
-        public void executeAsync()
-        {
-            makeDataRequests(targetReplicas.subList(0, 1));
-            if (targetReplicas.size() > 1)
-                makeDigestRequests(targetReplicas.subList(1, 
targetReplicas.size()));
-        }
-
-        public void maybeTryAdditionalReplicas()
-        {
-            if (shouldSpeculateAndMaybeWait() && logFailedSpeculation)
-            {
-                cfs.metric.speculativeInsufficientReplicas.inc();
-            }
-        }
-
-        public Collection<InetAddressAndPort> getContactedReplicas()
-        {
-            return targetReplicas;
-        }
-    }
-
-    static class SpeculatingReadExecutor extends AbstractReadExecutor
-    {
-        private volatile boolean speculated = false;
-
-        public SpeculatingReadExecutor(Keyspace keyspace,
-                                       ColumnFamilyStore cfs,
-                                       ReadCommand command,
-                                       ConsistencyLevel consistencyLevel,
-                                       List<InetAddressAndPort> targetReplicas,
-                                       long queryStartNanoTime)
-        {
-            super(keyspace, cfs, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
-        }
-
-        public void executeAsync()
-        {
-            // if CL + RR result in covering all replicas, getReadExecutor 
forces AlwaysSpeculating.  So we know
-            // that the last replica in our list is "extra."
-            List<InetAddressAndPort> initialReplicas = 
targetReplicas.subList(0, targetReplicas.size() - 1);
-
-            if (handler.blockfor < initialReplicas.size())
-            {
-                // We're hitting additional targets for read repair.  Since 
our "extra" replica is the least-
-                // preferred by the snitch, we do an extra data read to start 
with against a replica more
-                // likely to reply; better to let RR fail than the entire 
query.
-                makeDataRequests(initialReplicas.subList(0, 2));
-                if (initialReplicas.size() > 2)
-                    makeDigestRequests(initialReplicas.subList(2, 
initialReplicas.size()));
-            }
-            else
-            {
-                // not doing read repair; all replies are important, so it 
doesn't matter which nodes we
-                // perform data reads against vs digest.
-                makeDataRequests(initialReplicas.subList(0, 1));
-                if (initialReplicas.size() > 1)
-                    makeDigestRequests(initialReplicas.subList(1, 
initialReplicas.size()));
-            }
-        }
-
-        public void maybeTryAdditionalReplicas()
-        {
-            if (shouldSpeculateAndMaybeWait())
-            {
-                //Handle speculation stats first in case the callback fires 
immediately
-                speculated = true;
-                cfs.metric.speculativeRetries.inc();
-                // Could be waiting on the data, or on enough digests.
-                ReadCommand retryCommand = command;
-                if (handler.resolver.isDataPresent())
-                    retryCommand = command.copyAsDigestQuery();
-
-                InetAddressAndPort extraReplica = 
Iterables.getLast(targetReplicas);
-                if (traceState != null)
-                    traceState.trace("speculating read retry on {}", 
extraReplica);
-                logger.trace("speculating read retry on {}", extraReplica);
-                
MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), 
extraReplica, handler);
-            }
-        }
-
-        public Collection<InetAddressAndPort> getContactedReplicas()
-        {
-            return speculated
-                 ? targetReplicas
-                 : targetReplicas.subList(0, targetReplicas.size() - 1);
-        }
-
-        @Override
-        void onReadTimeout()
-        {
-            //Shouldn't be possible to get here without first attempting to 
speculate even if the
-            //timing is bad
-            assert speculated;
-            cfs.metric.speculativeFailedRetries.inc();
-        }
-    }
-
-    private static class AlwaysSpeculatingReadExecutor extends 
AbstractReadExecutor
-    {
-        public AlwaysSpeculatingReadExecutor(Keyspace keyspace,
-                                             ColumnFamilyStore cfs,
-                                             ReadCommand command,
-                                             ConsistencyLevel consistencyLevel,
-                                             List<InetAddressAndPort> 
targetReplicas,
-                                             long queryStartNanoTime)
-        {
-            super(keyspace, cfs, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
-        }
-
-        public void maybeTryAdditionalReplicas()
-        {
-            // no-op
-        }
-
-        public Collection<InetAddressAndPort> getContactedReplicas()
-        {
-            return targetReplicas;
-        }
-
-        @Override
-        public void executeAsync()
-        {
-            makeDataRequests(targetReplicas.subList(0, targetReplicas.size() > 
1 ? 2 : 1));
-            if (targetReplicas.size() > 2)
-                makeDigestRequests(targetReplicas.subList(2, 
targetReplicas.size()));
-            cfs.metric.speculativeRetries.inc();
-        }
-
-        @Override
-        void onReadTimeout()
-        {
-            cfs.metric.speculativeFailedRetries.inc();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java 
b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
deleted file mode 100644
index d613f3d..0000000
--- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.db.ReadResponse;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-public class AsyncRepairCallback implements IAsyncCallback<ReadResponse>
-{
-    private final DataResolver repairResolver;
-    private final int blockfor;
-    protected final AtomicInteger received = new AtomicInteger(0);
-
-    public AsyncRepairCallback(DataResolver repairResolver, int blockfor)
-    {
-        this.repairResolver = repairResolver;
-        this.blockfor = blockfor;
-    }
-
-    public void response(MessageIn<ReadResponse> message)
-    {
-        repairResolver.preprocess(message);
-        if (received.incrementAndGet() == blockfor)
-        {
-            StageManager.getStage(Stage.READ_REPAIR).execute(new 
WrappedRunnable()
-            {
-                protected void runMayThrow()
-                {
-                    repairResolver.compareResponses();
-                }
-            });
-        }
-    }
-
-    public boolean isLatencyForSnitch()
-    {
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java 
b/src/java/org/apache/cassandra/service/DataResolver.java
deleted file mode 100644
index 82db754..0000000
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ /dev/null
@@ -1,838 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.util.*;
-import java.util.concurrent.TimeoutException;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.schema.ColumnMetadata;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.filter.DataLimits.Counter;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.transform.*;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.ExcludingBounds;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
-
-public class DataResolver extends ResponseResolver
-{
-    private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
-        Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
-
-    @VisibleForTesting
-    final List<AsyncOneResponse> repairResults = 
Collections.synchronizedList(new ArrayList<>());
-    private final long queryStartNanoTime;
-    private final boolean enforceStrictLiveness;
-
-    DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel 
consistency, int maxResponseCount, long queryStartNanoTime)
-    {
-        super(keyspace, command, consistency, maxResponseCount);
-        this.queryStartNanoTime = queryStartNanoTime;
-        this.enforceStrictLiveness = 
command.metadata().enforceStrictLiveness();
-    }
-
-    public PartitionIterator getData()
-    {
-        ReadResponse response = responses.iterator().next().payload;
-        return 
UnfilteredPartitionIterators.filter(response.makeIterator(command), 
command.nowInSec());
-    }
-
-    public boolean isDataPresent()
-    {
-        return !responses.isEmpty();
-    }
-
-    public void compareResponses()
-    {
-        // We need to fully consume the results to trigger read repairs if 
appropriate
-        try (PartitionIterator iterator = resolve())
-        {
-            PartitionIterators.consume(iterator);
-        }
-    }
-
-    public PartitionIterator resolve()
-    {
-        // We could get more responses while this method runs, which is ok 
(we're happy to ignore any response not here
-        // at the beginning of this method), so grab the response count once 
and use that through the method.
-        int count = responses.size();
-        List<UnfilteredPartitionIterator> iters = new ArrayList<>(count);
-        InetAddressAndPort[] sources = new InetAddressAndPort[count];
-        for (int i = 0; i < count; i++)
-        {
-            MessageIn<ReadResponse> msg = responses.get(i);
-            iters.add(msg.payload.makeIterator(command));
-            sources[i] = msg.from;
-        }
-
-        /*
-         * Even though every response, individually, will honor the limit, it 
is possible that we will, after the merge,
-         * have more rows than the client requested. To make sure that we 
still conform to the original limit,
-         * we apply a top-level post-reconciliation counter to the merged 
partition iterator.
-         *
-         * Short read protection logic 
(ShortReadRowsProtection.moreContents()) relies on this counter to be applied
-         * to the current partition to work. For this reason we have to apply 
the counter transformation before
-         * empty partition discard logic kicks in - for it will eagerly 
consume the iterator.
-         *
-         * That's why the order here is: 1) merge; 2) filter rows; 3) count; 
4) discard empty partitions
-         *
-         * See CASSANDRA-13747 for more details.
-         */
-
-        DataLimits.Counter mergedResultCounter =
-            command.limits().newCounter(command.nowInSec(), true, 
command.selectsFullPartition(), enforceStrictLiveness);
-
-        UnfilteredPartitionIterator merged = 
mergeWithShortReadProtection(iters, sources, mergedResultCounter);
-        FilteredPartitions filtered =
-            FilteredPartitions.filter(merged, new Filter(command.nowInSec(), 
command.metadata().enforceStrictLiveness()));
-        PartitionIterator counted = Transformation.apply(filtered, 
mergedResultCounter);
-        return Transformation.apply(counted, new EmptyPartitionsDiscarder());
-    }
-
-    private UnfilteredPartitionIterator 
mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
-                                                                     
InetAddressAndPort[] sources,
-                                                                     
DataLimits.Counter mergedResultCounter)
-    {
-        // If we have only one results, there is no read repair to do and we 
can't get short reads
-        if (results.size() == 1)
-            return results.get(0);
-
-        /*
-         * So-called short reads stems from nodes returning only a subset of 
the results they have due to the limit,
-         * but that subset not being enough post-reconciliation. So if we 
don't have a limit, don't bother.
-         */
-        if (!command.limits().isUnlimited())
-            for (int i = 0; i < results.size(); i++)
-                results.set(i, extendWithShortReadProtection(results.get(i), 
sources[i], mergedResultCounter));
-
-        return UnfilteredPartitionIterators.merge(results, command.nowInSec(), 
new RepairMergeListener(sources));
-    }
-
-    private class RepairMergeListener implements 
UnfilteredPartitionIterators.MergeListener
-    {
-        private final InetAddressAndPort[] sources;
-
-        private RepairMergeListener(InetAddressAndPort[] sources)
-        {
-            this.sources = sources;
-        }
-
-        public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
-        {
-            return new MergeListener(partitionKey, columns(versions), 
isReversed(versions));
-        }
-
-        private RegularAndStaticColumns columns(List<UnfilteredRowIterator> 
versions)
-        {
-            Columns statics = Columns.NONE;
-            Columns regulars = Columns.NONE;
-            for (UnfilteredRowIterator iter : versions)
-            {
-                if (iter == null)
-                    continue;
-
-                RegularAndStaticColumns cols = iter.columns();
-                statics = statics.mergeTo(cols.statics);
-                regulars = regulars.mergeTo(cols.regulars);
-            }
-            return new RegularAndStaticColumns(statics, regulars);
-        }
-
-        private boolean isReversed(List<UnfilteredRowIterator> versions)
-        {
-            for (UnfilteredRowIterator iter : versions)
-            {
-                if (iter == null)
-                    continue;
-
-                // Everything will be in the same order
-                return iter.isReverseOrder();
-            }
-
-            assert false : "Expected at least one iterator";
-            return false;
-        }
-
-        public void close()
-        {
-            try
-            {
-                FBUtilities.waitOnFutures(repairResults, 
DatabaseDescriptor.getWriteRpcTimeout());
-            }
-            catch (TimeoutException ex)
-            {
-                // We got all responses, but timed out while repairing
-                int blockFor = consistency.blockFor(keyspace);
-                if (Tracing.isTracing())
-                    Tracing.trace("Timed out while read-repairing after 
receiving all {} data and digest responses", blockFor);
-                else
-                    logger.debug("Timeout while read-repairing after receiving 
all {} data and digest responses", blockFor);
-
-                throw new ReadTimeoutException(consistency, blockFor-1, 
blockFor, true);
-            }
-        }
-
-        private class MergeListener implements 
UnfilteredRowIterators.MergeListener
-        {
-            private final DecoratedKey partitionKey;
-            private final RegularAndStaticColumns columns;
-            private final boolean isReversed;
-            private final PartitionUpdate.Builder[] repairs = new 
PartitionUpdate.Builder[sources.length];
-
-            private final Row.Builder[] currentRows = new 
Row.Builder[sources.length];
-            private final RowDiffListener diffListener;
-
-            // The partition level deletion for the merge row.
-            private DeletionTime partitionLevelDeletion;
-            // When merged has a currently open marker, its time. null 
otherwise.
-            private DeletionTime mergedDeletionTime;
-            // For each source, the time of the current deletion as known by 
the source.
-            private final DeletionTime[] sourceDeletionTime = new 
DeletionTime[sources.length];
-            // For each source, record if there is an open range to send as 
repair, and from where.
-            private final ClusteringBound[] markerToRepair = new 
ClusteringBound[sources.length];
-
-            private MergeListener(DecoratedKey partitionKey, 
RegularAndStaticColumns columns, boolean isReversed)
-            {
-                this.partitionKey = partitionKey;
-                this.columns = columns;
-                this.isReversed = isReversed;
-
-                this.diffListener = new RowDiffListener()
-                {
-                    public void onPrimaryKeyLivenessInfo(int i, Clustering 
clustering, LivenessInfo merged, LivenessInfo original)
-                    {
-                        if (merged != null && !merged.equals(original))
-                            currentRow(i, 
clustering).addPrimaryKeyLivenessInfo(merged);
-                    }
-
-                    public void onDeletion(int i, Clustering clustering, 
Row.Deletion merged, Row.Deletion original)
-                    {
-                        if (merged != null && !merged.equals(original))
-                            currentRow(i, clustering).addRowDeletion(merged);
-                    }
-
-                    public void onComplexDeletion(int i, Clustering 
clustering, ColumnMetadata column, DeletionTime merged, DeletionTime original)
-                    {
-                        if (merged != null && !merged.equals(original))
-                            currentRow(i, 
clustering).addComplexDeletion(column, merged);
-                    }
-
-                    public void onCell(int i, Clustering clustering, Cell 
merged, Cell original)
-                    {
-                        if (merged != null && !merged.equals(original) && 
isQueried(merged))
-                            currentRow(i, clustering).addCell(merged);
-                    }
-
-                    private boolean isQueried(Cell cell)
-                    {
-                        // When we read, we may have some cell that have been 
fetched but are not selected by the user. Those cells may
-                        // have empty values as optimization (see 
CASSANDRA-10655) and hence they should not be included in the read-repair.
-                        // This is fine since those columns are not actually 
requested by the user and are only present for the sake of CQL
-                        // semantic (making sure we can always distinguish 
between a row that doesn't exist from one that do exist but has
-                        /// no value for the column requested by the user) and 
so it won't be unexpected by the user that those columns are
-                        // not repaired.
-                        ColumnMetadata column = cell.column();
-                        ColumnFilter filter = command.columnFilter();
-                        return column.isComplex() ? 
filter.fetchedCellIsQueried(column, cell.path()) : 
filter.fetchedColumnIsQueried(column);
-                    }
-                };
-            }
-
-            private PartitionUpdate.Builder update(int i)
-            {
-                if (repairs[i] == null)
-                    repairs[i] = new 
PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1);
-                return repairs[i];
-            }
-
-            /**
-             * The partition level deletion with with which source {@code i} 
is currently repaired, or
-             * {@code DeletionTime.LIVE} if the source is not repaired on the 
partition level deletion (meaning it was
-             * up to date on it). The output* of this method is only valid 
after the call to
-             * {@link #onMergedPartitionLevelDeletion}.
-             */
-            private DeletionTime partitionLevelRepairDeletion(int i)
-            {
-                return repairs[i] == null ? DeletionTime.LIVE : 
repairs[i].partitionLevelDeletion();
-            }
-
-            private Row.Builder currentRow(int i, Clustering clustering)
-            {
-                if (currentRows[i] == null)
-                {
-                    currentRows[i] = BTreeRow.sortedBuilder();
-                    currentRows[i].newRow(clustering);
-                }
-                return currentRows[i];
-            }
-
-            public void onMergedPartitionLevelDeletion(DeletionTime 
mergedDeletion, DeletionTime[] versions)
-            {
-                this.partitionLevelDeletion = mergedDeletion;
-                for (int i = 0; i < versions.length; i++)
-                {
-                    if (mergedDeletion.supersedes(versions[i]))
-                        update(i).addPartitionDeletion(mergedDeletion);
-                }
-            }
-
-            public void onMergedRows(Row merged, Row[] versions)
-            {
-                // If a row was shadowed post merged, it must be by a 
partition level or range tombstone, and we handle
-                // those case directly in their respective methods (in other 
words, it would be inefficient to send a row
-                // deletion as repair when we know we've already send a 
partition level or range tombstone that covers it).
-                if (merged.isEmpty())
-                    return;
-
-                Rows.diff(diffListener, merged, versions);
-                for (int i = 0; i < currentRows.length; i++)
-                {
-                    if (currentRows[i] != null)
-                        update(i).add(currentRows[i].build());
-                }
-                Arrays.fill(currentRows, null);
-            }
-
-            private DeletionTime currentDeletion()
-            {
-                return mergedDeletionTime == null ? partitionLevelDeletion : 
mergedDeletionTime;
-            }
-
-            public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker 
merged, RangeTombstoneMarker[] versions)
-            {
-                try
-                {
-                    // The code for merging range tombstones is a tad complex 
and we had the assertions there triggered
-                    // unexpectedly in a few occasions (CASSANDRA-13237, 
CASSANDRA-13719). It's hard to get insights
-                    // when that happen without more context that what the 
assertion errors give us however, hence the
-                    // catch here that basically gather as much as context as 
reasonable.
-                    internalOnMergedRangeTombstoneMarkers(merged, versions);
-                }
-                catch (AssertionError e)
-                {
-                    // The following can be pretty verbose, but it's really 
only triggered if a bug happen, so we'd
-                    // rather get more info to debug than not.
-                    TableMetadata table = command.metadata();
-                    String details = String.format("Error merging RTs on %s: 
merged=%s, versions=%s, sources={%s}, responses:%n %s",
-                                                   table,
-                                                   merged == null ? "null" : 
merged.toString(table),
-                                                   '[' + Joiner.on(", 
").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" 
: rt.toString(table))) + ']',
-                                                   Arrays.toString(sources),
-                                                   makeResponsesDebugString());
-                    throw new AssertionError(details, e);
-                }
-            }
-
-            private String makeResponsesDebugString()
-            {
-                return Joiner.on(",\n")
-                             .join(Iterables.transform(getMessages(), m -> 
m.from + " => " + m.payload.toDebugString(command, partitionKey)));
-            }
-
-            private void 
internalOnMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, 
RangeTombstoneMarker[] versions)
-            {
-                // The current deletion as of dealing with this marker.
-                DeletionTime currentDeletion = currentDeletion();
-
-                for (int i = 0; i < versions.length; i++)
-                {
-                    RangeTombstoneMarker marker = versions[i];
-
-                    // Update what the source now thinks is the current 
deletion
-                    if (marker != null)
-                        sourceDeletionTime[i] = marker.isOpen(isReversed) ? 
marker.openDeletionTime(isReversed) : null;
-
-                    // If merged == null, some of the source is opening or 
closing a marker
-                    if (merged == null)
-                    {
-                        // but if it's not this source, move to the next one
-                        if (marker == null)
-                            continue;
-
-                        // We have a close and/or open marker for a source, 
with nothing corresponding in merged.
-                        // Because merged is a superset, this imply that we 
have a current deletion (being it due to an
-                        // early opening in merged or a partition level 
deletion) and that this deletion will still be
-                        // active after that point. Further whatever deletion 
was open or is open by this marker on the
-                        // source, that deletion cannot supersedes the current 
one.
-                        //
-                        // But while the marker deletion (before and/or after 
this point) cannot supersede the current
-                        // deletion, we want to know if it's equal to it (both 
before and after), because in that case
-                        // the source is up to date and we don't want to 
include repair.
-                        //
-                        // So in practice we have 2 possible case:
-                        //  1) the source was up-to-date on deletion up to 
that point: then it won't be from that point
-                        //     on unless it's a boundary and the new opened 
deletion time is also equal to the current
-                        //     deletion (note that this implies the boundary 
has the same closing and opening deletion
-                        //     time, which should generally not happen, but 
can due to legacy reading code not avoiding
-                        //     this for a while, see CASSANDRA-13237).
-                        //  2) the source wasn't up-to-date on deletion up to 
that point and it may now be (if it isn't
-                        //     we just have nothing to do for that marker).
-                        assert !currentDeletion.isLive() : 
currentDeletion.toString();
-
-                        // Is the source up to date on deletion? It's up to 
date if it doesn't have an open RT repair
-                        // nor an "active" partition level deletion (where 
"active" means that it's greater or equal
-                        // to the current deletion: if the source has a 
repaired partition deletion lower than the
-                        // current deletion, this means the current deletion 
is due to a previously open range tombstone,
-                        // and if the source isn't currently repaired for that 
RT, then it means it's up to date on it).
-                        DeletionTime partitionRepairDeletion = 
partitionLevelRepairDeletion(i);
-                        if (markerToRepair[i] == null && 
currentDeletion.supersedes(partitionRepairDeletion))
-                        {
-                            // Since there is an ongoing merged deletion, the 
only way we don't have an open repair for
-                            // this source is that it had a range open with 
the same deletion as current and it's
-                            // closing it.
-                            assert marker.isClose(isReversed) && 
currentDeletion.equals(marker.closeDeletionTime(isReversed))
-                                 : String.format("currentDeletion=%s, 
marker=%s", currentDeletion, marker.toString(command.metadata()));
-
-                            // and so unless it's a boundary whose opening 
deletion time is still equal to the current
-                            // deletion (see comment above for why this can 
actually happen), we have to repair the source
-                            // from that point on.
-                            if (!(marker.isOpen(isReversed) && 
currentDeletion.equals(marker.openDeletionTime(isReversed))))
-                                markerToRepair[i] = 
marker.closeBound(isReversed).invert();
-                        }
-                        // In case 2) above, we only have something to do if 
the source is up-to-date after that point
-                        // (which, since the source isn't up-to-date before 
that point, means we're opening a new deletion
-                        // that is equal to the current one).
-                        else if (marker.isOpen(isReversed) && 
currentDeletion.equals(marker.openDeletionTime(isReversed)))
-                        {
-                            closeOpenMarker(i, 
marker.openBound(isReversed).invert());
-                        }
-                    }
-                    else
-                    {
-                        // We have a change of current deletion in merged 
(potentially to/from no deletion at all).
-
-                        if (merged.isClose(isReversed))
-                        {
-                            // We're closing the merged range. If we're 
recorded that this should be repaird for the
-                            // source, close and add said range to the repair 
to send.
-                            if (markerToRepair[i] != null)
-                                closeOpenMarker(i, 
merged.closeBound(isReversed));
-
-                        }
-
-                        if (merged.isOpen(isReversed))
-                        {
-                            // If we're opening a new merged range (or just 
switching deletion), then unless the source
-                            // is up to date on that deletion (note that we've 
updated what the source deleteion is
-                            // above), we'll have to sent the range to the 
source.
-                            DeletionTime newDeletion = 
merged.openDeletionTime(isReversed);
-                            DeletionTime sourceDeletion = 
sourceDeletionTime[i];
-                            if (!newDeletion.equals(sourceDeletion))
-                                markerToRepair[i] = 
merged.openBound(isReversed);
-                        }
-                    }
-                }
-
-                if (merged != null)
-                    mergedDeletionTime = merged.isOpen(isReversed) ? 
merged.openDeletionTime(isReversed) : null;
-            }
-
-            private void closeOpenMarker(int i, ClusteringBound close)
-            {
-                ClusteringBound open = markerToRepair[i];
-                update(i).add(new RangeTombstone(Slice.make(isReversed ? close 
: open, isReversed ? open : close), currentDeletion()));
-                markerToRepair[i] = null;
-            }
-
-            public void close()
-            {
-                for (int i = 0; i < repairs.length; i++)
-                    if (null != repairs[i])
-                        sendRepairMutation(repairs[i].build(), sources[i]);
-            }
-
-            private void sendRepairMutation(PartitionUpdate partition, 
InetAddressAndPort destination)
-            {
-                Mutation mutation = new Mutation(partition);
-                int messagingVersion = 
MessagingService.instance().getVersion(destination);
-
-                int    mutationSize = (int) 
Mutation.serializer.serializedSize(mutation, messagingVersion);
-                int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
-
-                if (mutationSize <= maxMutationSize)
-                {
-                    Tracing.trace("Sending read-repair-mutation to {}", 
destination);
-                    // use a separate verb here to avoid writing hints on 
timeouts
-                    MessageOut<Mutation> message = 
mutation.createMessage(MessagingService.Verb.READ_REPAIR);
-                    
repairResults.add(MessagingService.instance().sendRR(message, destination));
-                    
ColumnFamilyStore.metricsFor(command.metadata().id).readRepairRequests.mark();
-                }
-                else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
-                {
-                    logger.debug("Encountered an oversized ({}/{}) read repair 
mutation for table {}, key {}, node {}",
-                                 mutationSize,
-                                 maxMutationSize,
-                                 command.metadata(),
-                                 
command.metadata().partitionKeyType.getString(partitionKey.getKey()),
-                                 destination);
-                }
-                else
-                {
-                    logger.warn("Encountered an oversized ({}/{}) read repair 
mutation for table {}, key {}, node {}",
-                                mutationSize,
-                                maxMutationSize,
-                                command.metadata(),
-                                
command.metadata().partitionKeyType.getString(partitionKey.getKey()),
-                                destination);
-
-                    int blockFor = consistency.blockFor(keyspace);
-                    Tracing.trace("Timed out while read-repairing after 
receiving all {} data and digest responses", blockFor);
-                    throw new ReadTimeoutException(consistency, blockFor - 1, 
blockFor, true);
-                }
-            }
-        }
-    }
-
-    private UnfilteredPartitionIterator 
extendWithShortReadProtection(UnfilteredPartitionIterator partitions,
-                                                                      
InetAddressAndPort source,
-                                                                      
DataLimits.Counter mergedResultCounter)
-    {
-        DataLimits.Counter singleResultCounter =
-            command.limits().newCounter(command.nowInSec(), false, 
command.selectsFullPartition(), enforceStrictLiveness).onlyCount();
-
-        ShortReadPartitionsProtection protection =
-            new ShortReadPartitionsProtection(source, singleResultCounter, 
mergedResultCounter, queryStartNanoTime);
-
-        /*
-         * The order of extention and transformations is important here. 
Extending with more partitions has to happen
-         * first due to the way BaseIterator.hasMoreContents() works: only 
transformations applied after extension will
-         * be called on the first partition of the extended iterator.
-         *
-         * Additionally, we want singleResultCounter to be applied after SRPP, 
so that its applyToPartition() method will
-         * be called last, after the extension done by SRRP.applyToPartition() 
call. That way we preserve the same order
-         * when it comes to calling SRRP.moreContents() and applyToRow() 
callbacks.
-         *
-         * See ShortReadPartitionsProtection.applyToPartition() for more 
details.
-         */
-
-        // extend with moreContents() only if it's a range read command with 
no partition key specified
-        if (!command.isLimitedToOnePartition())
-            partitions = MorePartitions.extend(partitions, protection);     // 
register SRPP.moreContents()
-
-        partitions = Transformation.apply(partitions, protection);          // 
register SRPP.applyToPartition()
-        partitions = Transformation.apply(partitions, singleResultCounter); // 
register the per-source counter
-
-        return partitions;
-    }
-
-    /*
-     * We have a potential short read if the result from a given node contains 
the requested number of rows
-     * (i.e. it has stopped returning results due to the limit), but some of 
them haven't
-     * made it into the final post-reconciliation result due to other nodes' 
row, range, and/or partition tombstones.
-     *
-     * If that is the case, then that node may have more rows that we should 
fetch, as otherwise we could
-     * ultimately return fewer rows than required. Also, those additional rows 
may contain tombstones which
-     * which we also need to fetch as they may shadow rows or partitions from 
other replicas' results, which we would
-     * otherwise return incorrectly.
-     */
-    private class ShortReadPartitionsProtection extends 
Transformation<UnfilteredRowIterator> implements 
MorePartitions<UnfilteredPartitionIterator>
-    {
-        private final InetAddressAndPort source;
-
-        private final DataLimits.Counter singleResultCounter; // unmerged 
per-source counter
-        private final DataLimits.Counter mergedResultCounter; // merged 
end-result counter
-
-        private DecoratedKey lastPartitionKey; // key of the last observed 
partition
-
-        private boolean partitionsFetched; // whether we've seen any new 
partitions since iteration start or last moreContents() call
-
-        private final long queryStartNanoTime;
-
-        private ShortReadPartitionsProtection(InetAddressAndPort source,
-                                              DataLimits.Counter 
singleResultCounter,
-                                              DataLimits.Counter 
mergedResultCounter,
-                                              long queryStartNanoTime)
-        {
-            this.source = source;
-            this.singleResultCounter = singleResultCounter;
-            this.mergedResultCounter = mergedResultCounter;
-            this.queryStartNanoTime = queryStartNanoTime;
-        }
-
-        @Override
-        public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator 
partition)
-        {
-            partitionsFetched = true;
-
-            lastPartitionKey = partition.partitionKey();
-
-            /*
-             * Extend for moreContents() then apply protection to track 
lastClustering by applyToRow().
-             *
-             * If we don't apply the transformation *after* extending the 
partition with MoreRows,
-             * applyToRow() method of protection will not be called on the 
first row of the new extension iterator.
-             */
-            ShortReadRowsProtection protection = new 
ShortReadRowsProtection(partition.metadata(), partition.partitionKey());
-            return Transformation.apply(MoreRows.extend(partition, 
protection), protection);
-        }
-
-        /*
-         * We only get here once all the rows and partitions in this iterator 
have been iterated over, and so
-         * if the node had returned the requested number of rows but we still 
get here, then some results were
-         * skipped during reconciliation.
-         */
-        public UnfilteredPartitionIterator moreContents()
-        {
-            // never try to request additional partitions from replicas if our 
reconciled partitions are already filled to the limit
-            assert !mergedResultCounter.isDone();
-
-            // we do not apply short read protection when we have no limits at 
all
-            assert !command.limits().isUnlimited();
-
-            /*
-             * If this is a single partition read command or an (indexed) 
partition range read command with
-             * a partition key specified, then we can't and shouldn't try 
fetch more partitions.
-             */
-            assert !command.isLimitedToOnePartition();
-
-            /*
-             * If the returned result doesn't have enough rows/partitions to 
satisfy even the original limit, don't ask for more.
-             *
-             * Can only take the short cut if there is no per partition limit 
set. Otherwise it's possible to hit false
-             * positives due to some rows being uncounted for in certain 
scenarios (see CASSANDRA-13911).
-             */
-            if (!singleResultCounter.isDone() && 
command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
-                return null;
-
-            /*
-             * Either we had an empty iterator as the initial response, or our 
moreContents() call got us an empty iterator.
-             * There is no point to ask the replica for more rows - it has no 
more in the requested range.
-             */
-            if (!partitionsFetched)
-                return null;
-            partitionsFetched = false;
-
-            /*
-             * We are going to fetch one partition at a time for thrift and 
potentially more for CQL.
-             * The row limit will either be set to the per partition limit - 
if the command has no total row limit set, or
-             * the total # of rows remaining - if it has some. If we don't 
grab enough rows in some of the partitions,
-             * then future ShortReadRowsProtection.moreContents() calls will 
fetch the missing ones.
-             */
-            int toQuery = command.limits().count() != DataLimits.NO_LIMIT
-                        ? command.limits().count() - 
counted(mergedResultCounter)
-                        : command.limits().perPartitionCount();
-
-            
ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark();
-            Tracing.trace("Requesting {} extra rows from {} for short read 
protection", toQuery, source);
-
-            PartitionRangeReadCommand cmd = 
makeFetchAdditionalPartitionReadCommand(toQuery);
-            return executeReadCommand(cmd);
-        }
-
-        // Counts the number of rows for regular queries and the number of 
groups for GROUP BY queries
-        private int counted(Counter counter)
-        {
-            return command.limits().isGroupByLimit()
-                 ? counter.rowCounted()
-                 : counter.counted();
-        }
-
-        private PartitionRangeReadCommand 
makeFetchAdditionalPartitionReadCommand(int toQuery)
-        {
-            PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) 
command;
-
-            DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery);
-
-            AbstractBounds<PartitionPosition> bounds = 
cmd.dataRange().keyRange();
-            AbstractBounds<PartitionPosition> newBounds = 
bounds.inclusiveRight()
-                                                        ? new 
Range<>(lastPartitionKey, bounds.right)
-                                                        : new 
ExcludingBounds<>(lastPartitionKey, bounds.right);
-            DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
-
-            return cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange);
-        }
-
-        private class ShortReadRowsProtection extends Transformation 
implements MoreRows<UnfilteredRowIterator>
-        {
-            private final TableMetadata metadata;
-            private final DecoratedKey partitionKey;
-
-            private Clustering lastClustering; // clustering of the last 
observed row
-
-            private int lastCounted = 0; // last seen recorded # before 
attempting to fetch more rows
-            private int lastFetched = 0; // # rows returned by last attempt to 
get more (or by the original read command)
-            private int lastQueried = 0; // # extra rows requested from the 
replica last time
-
-            private ShortReadRowsProtection(TableMetadata metadata, 
DecoratedKey partitionKey)
-            {
-                this.metadata = metadata;
-                this.partitionKey = partitionKey;
-            }
-
-            @Override
-            public Row applyToRow(Row row)
-            {
-                lastClustering = row.clustering();
-                return row;
-            }
-
-            /*
-             * We only get here once all the rows in this iterator have been 
iterated over, and so if the node
-             * had returned the requested number of rows but we still get 
here, then some results were skipped
-             * during reconciliation.
-             */
-            public UnfilteredRowIterator moreContents()
-            {
-                // never try to request additional rows from replicas if our 
reconciled partition is already filled to the limit
-                assert !mergedResultCounter.isDoneForPartition();
-
-                // we do not apply short read protection when we have no 
limits at all
-                assert !command.limits().isUnlimited();
-
-                /*
-                 * If the returned partition doesn't have enough rows to 
satisfy even the original limit, don't ask for more.
-                 *
-                 * Can only take the short cut if there is no per partition 
limit set. Otherwise it's possible to hit false
-                 * positives due to some rows being uncounted for in certain 
scenarios (see CASSANDRA-13911).
-                 */
-                if (!singleResultCounter.isDoneForPartition() && 
command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
-                    return null;
-
-                /*
-                 * If the replica has no live rows in the partition, don't try 
to fetch more.
-                 *
-                 * Note that the previous branch [if 
(!singleResultCounter.isDoneForPartition()) return null] doesn't
-                 * always cover this scenario:
-                 * isDoneForPartition() is defined as [isDone() || 
rowInCurrentPartition >= perPartitionLimit],
-                 * and will return true if isDone() returns true, even if 
there are 0 rows counted in the current partition.
-                 *
-                 * This can happen with a range read if after 1+ rounds of 
short read protection requests we managed to fetch
-                 * enough extra rows for other partitions to satisfy the 
singleResultCounter's total row limit, but only
-                 * have tombstones in the current partition.
-                 *
-                 * One other way we can hit this condition is when the 
partition only has a live static row and no regular
-                 * rows. In that scenario the counter will remain at 0 until 
the partition is closed - which happens after
-                 * the moreContents() call.
-                 */
-                if (countedInCurrentPartition(singleResultCounter) == 0)
-                    return null;
-
-                /*
-                 * This is a table with no clustering columns, and has at most 
one row per partition - with EMPTY clustering.
-                 * We already have the row, so there is no point in asking for 
more from the partition.
-                 */
-                if (Clustering.EMPTY == lastClustering)
-                    return null;
-
-                lastFetched = countedInCurrentPartition(singleResultCounter) - 
lastCounted;
-                lastCounted = countedInCurrentPartition(singleResultCounter);
-
-                // getting back fewer rows than we asked for means the 
partition on the replica has been fully consumed
-                if (lastQueried > 0 && lastFetched < lastQueried)
-                    return null;
-
-                /*
-                 * At this point we know that:
-                 *     1. the replica returned [repeatedly?] as many rows as 
we asked for and potentially has more
-                 *        rows in the partition
-                 *     2. at least one of those returned rows was shadowed by 
a tombstone returned from another
-                 *        replica
-                 *     3. we haven't satisfied the client's limits yet, and 
should attempt to query for more rows to
-                 *        avoid a short read
-                 *
-                 * In the ideal scenario, we would get exactly min(a, b) or 
fewer rows from the next request, where a and b
-                 * are defined as follows:
-                 *     [a] limits.count() - mergedResultCounter.counted()
-                 *     [b] limits.perPartitionCount() - 
mergedResultCounter.countedInCurrentPartition()
-                 *
-                 * It would be naive to query for exactly that many rows, as 
it's possible and not unlikely
-                 * that some of the returned rows would also be shadowed by 
tombstones from other hosts.
-                 *
-                 * Note: we don't know, nor do we care, how many rows from the 
replica made it into the reconciled result;
-                 * we can only tell how many in total we queried for, and that 
[0, mrc.countedInCurrentPartition()) made it.
-                 *
-                 * In general, our goal should be to minimise the number of 
extra requests - *not* to minimise the number
-                 * of rows fetched: there is a high transactional cost for 
every individual request, but a relatively low
-                 * marginal cost for each extra row requested.
-                 *
-                 * As such it's better to overfetch than to underfetch extra 
rows from a host; but at the same
-                 * time we want to respect paging limits and not blow up 
spectacularly.
-                 *
-                 * Note: it's ok to retrieve more rows that necessary since 
singleResultCounter is not stopping and only
-                 * counts.
-                 *
-                 * With that in mind, we'll just request the minimum of 
(count(), perPartitionCount()) limits.
-                 *
-                 * See CASSANDRA-13794 for more details.
-                 */
-                lastQueried = Math.min(command.limits().count(), 
command.limits().perPartitionCount());
-
-                
ColumnFamilyStore.metricsFor(metadata.id).shortReadProtectionRequests.mark();
-                Tracing.trace("Requesting {} extra rows from {} for short read 
protection", lastQueried, source);
-
-                SinglePartitionReadCommand cmd = 
makeFetchAdditionalRowsReadCommand(lastQueried);
-                return 
UnfilteredPartitionIterators.getOnlyElement(executeReadCommand(cmd), cmd);
-            }
-
-            // Counts the number of rows for regular queries and the number of 
groups for GROUP BY queries
-            private int countedInCurrentPartition(Counter counter)
-            {
-                return command.limits().isGroupByLimit()
-                     ? counter.rowCountedInCurrentPartition()
-                     : counter.countedInCurrentPartition();
-            }
-
-            private SinglePartitionReadCommand 
makeFetchAdditionalRowsReadCommand(int toQuery)
-            {
-                ClusteringIndexFilter filter = 
command.clusteringIndexFilter(partitionKey);
-                if (null != lastClustering)
-                    filter = filter.forPaging(metadata.comparator, 
lastClustering, false);
-
-                return SinglePartitionReadCommand.create(command.metadata(),
-                                                         command.nowInSec(),
-                                                         
command.columnFilter(),
-                                                         command.rowFilter(),
-                                                         
command.limits().forShortReadRetry(toQuery),
-                                                         partitionKey,
-                                                         filter,
-                                                         
command.indexMetadata());
-            }
-        }
-
-        private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd)
-        {
-            DataResolver resolver = new DataResolver(keyspace, cmd, 
ConsistencyLevel.ONE, 1, queryStartNanoTime);
-            ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, cmd, Collections.singletonList(source), 
queryStartNanoTime);
-
-            if (StorageProxy.canDoLocalRequest(source))
-                StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(cmd, handler));
-            else
-                
MessagingService.instance().sendRRWithFailure(cmd.createMessage(), source, 
handler);
-
-            // We don't call handler.get() because we want to preserve 
tombstones since we're still in the middle of merging node results.
-            handler.awaitResults();
-            assert resolver.responses.size() == 1;
-            return resolver.responses.get(0).payload.makeIterator(command);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/DigestMismatchException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestMismatchException.java 
b/src/java/org/apache/cassandra/service/DigestMismatchException.java
deleted file mode 100644
index 18d5939..0000000
--- a/src/java/org/apache/cassandra/service/DigestMismatchException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-@SuppressWarnings("serial")
-public class DigestMismatchException extends Exception
-{
-    public DigestMismatchException(DecoratedKey key, ByteBuffer digest1, 
ByteBuffer digest2)
-    {
-        super(String.format("Mismatch for key %s (%s vs %s)",
-                            key.toString(),
-                            ByteBufferUtil.bytesToHex(digest1),
-                            ByteBufferUtil.bytesToHex(digest2)));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java 
b/src/java/org/apache/cassandra/service/DigestResolver.java
deleted file mode 100644
index 6a528e9..0000000
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
-import org.apache.cassandra.net.MessageIn;
-
-public class DigestResolver extends ResponseResolver
-{
-    private volatile ReadResponse dataResponse;
-
-    public DigestResolver(Keyspace keyspace, ReadCommand command, 
ConsistencyLevel consistency, int maxResponseCount)
-    {
-        super(keyspace, command, consistency, maxResponseCount);
-    }
-
-    @Override
-    public void preprocess(MessageIn<ReadResponse> message)
-    {
-        super.preprocess(message);
-        if (dataResponse == null && !message.payload.isDigestResponse())
-            dataResponse = message.payload;
-    }
-
-    /**
-     * Special case of resolve() so that CL.ONE reads never throw 
DigestMismatchException in the foreground
-     */
-    public PartitionIterator getData()
-    {
-        assert isDataPresent();
-        return 
UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), 
command.nowInSec());
-    }
-
-    /*
-     * This method handles two different scenarios:
-     *
-     * a) we're handling the initial read of data from the closest replica + 
digests
-     *    from the rest. In this case we check the digests against each other,
-     *    throw an exception if there is a mismatch, otherwise return the data 
row.
-     *
-     * b) we're checking additional digests that arrived after the minimum to 
handle
-     *    the requested ConsistencyLevel, i.e. asynchronous read repair check
-     */
-    public PartitionIterator resolve() throws DigestMismatchException
-    {
-        if (responses.size() == 1)
-            return getData();
-
-        if (logger.isTraceEnabled())
-            logger.trace("resolving {} responses", responses.size());
-
-        compareResponses();
-
-        return 
UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), 
command.nowInSec());
-    }
-
-    public void compareResponses() throws DigestMismatchException
-    {
-        long start = System.nanoTime();
-
-        // validate digests against each other; throw immediately on mismatch.
-        ByteBuffer digest = null;
-        for (MessageIn<ReadResponse> message : responses)
-        {
-            ReadResponse response = message.payload;
-
-            ByteBuffer newDigest = response.digest(command);
-            if (digest == null)
-                digest = newDigest;
-            else if (!digest.equals(newDigest))
-                // rely on the fact that only single partition queries use 
digests
-                throw new 
DigestMismatchException(((SinglePartitionReadCommand)command).partitionKey(), 
digest, newDigest);
-        }
-
-        if (logger.isTraceEnabled())
-            logger.trace("resolve: {} ms.", 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-    }
-
-    public boolean isDataPresent()
-    {
-        return dataResponse != null;
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to