http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java 
b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 75885ae..c296cba 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -21,8 +21,9 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Preconditions;
 
-import org.apache.cassandra.locator.ReplicaLayout;
-
+import com.google.common.base.Predicates;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +51,6 @@ import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 
 import static com.google.common.collect.Iterables.all;
-import static com.google.common.collect.Iterables.tryFind;
 
 /**
  * Sends a read request to the replicas needed to satisfy a given 
ConsistencyLevel.
@@ -65,24 +65,25 @@ public abstract class AbstractReadExecutor
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractReadExecutor.class);
 
     protected final ReadCommand command;
-    private   final ReplicaLayout.ForToken replicaLayout;
-    protected final ReadRepair<EndpointsForToken, ReplicaLayout.ForToken> 
readRepair;
-    protected final DigestResolver<EndpointsForToken, ReplicaLayout.ForToken> 
digestResolver;
-    protected final ReadCallback<EndpointsForToken, ReplicaLayout.ForToken> 
handler;
+    private   final ReplicaPlan.SharedForTokenRead replicaPlan;
+    protected final ReadRepair<EndpointsForToken, ReplicaPlan.ForTokenRead> 
readRepair;
+    protected final DigestResolver<EndpointsForToken, 
ReplicaPlan.ForTokenRead> digestResolver;
+    protected final ReadCallback<EndpointsForToken, ReplicaPlan.ForTokenRead> 
handler;
     protected final TraceState traceState;
     protected final ColumnFamilyStore cfs;
     protected final long queryStartNanoTime;
     private   final int initialDataRequestCount;
     protected volatile PartitionIterator result = null;
 
-    AbstractReadExecutor(ColumnFamilyStore cfs, ReadCommand command, 
ReplicaLayout.ForToken replicaLayout, int initialDataRequestCount, long 
queryStartNanoTime)
+    AbstractReadExecutor(ColumnFamilyStore cfs, ReadCommand command, 
ReplicaPlan.ForTokenRead replicaPlan, int initialDataRequestCount, long 
queryStartNanoTime)
     {
         this.command = command;
-        this.replicaLayout = replicaLayout;
+        this.replicaPlan = ReplicaPlan.shared(replicaPlan);
         this.initialDataRequestCount = initialDataRequestCount;
-        this.readRepair = ReadRepair.create(command, replicaLayout, 
queryStartNanoTime);
-        this.digestResolver = new DigestResolver<>(command, replicaLayout, 
queryStartNanoTime);
-        this.handler = new ReadCallback<>(digestResolver, 
replicaLayout.consistencyLevel().blockFor(replicaLayout.keyspace()), command, 
replicaLayout, queryStartNanoTime);
+        // the ReadRepair and DigestResolver both need to see our updated
+        this.readRepair = ReadRepair.create(command, this.replicaPlan, 
queryStartNanoTime);
+        this.digestResolver = new DigestResolver<>(command, this.replicaPlan, 
queryStartNanoTime);
+        this.handler = new ReadCallback<>(digestResolver, command, 
this.replicaPlan, queryStartNanoTime);
         this.cfs = cfs;
         this.traceState = Tracing.instance.get();
         this.queryStartNanoTime = queryStartNanoTime;
@@ -93,7 +94,7 @@ public abstract class AbstractReadExecutor
         // TODO: we need this when talking with pre-3.0 nodes. So if we 
preserve the digest format moving forward, we can get rid of this once
         // we stop being compatible with pre-3.0 nodes.
         int digestVersion = MessagingService.current_version;
-        for (Replica replica : replicaLayout.selected())
+        for (Replica replica : replicaPlan.contacts())
             digestVersion = Math.min(digestVersion, 
MessagingService.instance().getVersion(replica.endpoint()));
         command.setDigestVersion(digestVersion);
     }
@@ -168,7 +169,7 @@ public abstract class AbstractReadExecutor
      */
     public void executeAsync()
     {
-        EndpointsForToken selected = replicaLayout().selected();
+        EndpointsForToken selected = replicaPlan().contacts();
         EndpointsForToken fullDataRequests = selected.filter(Replica::isFull, 
initialDataRequestCount);
         makeFullDataRequests(fullDataRequests);
         makeTransientDataRequests(selected.filter(Replica::isTransient));
@@ -184,30 +185,25 @@ public abstract class AbstractReadExecutor
         ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore(command.metadata().id);
         SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry;
 
-        // Endpoints for Token
-        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forRead(keyspace, 
command.partitionKey().getToken(), consistencyLevel, retry);
+        ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forRead(keyspace, 
command.partitionKey().getToken(), consistencyLevel, retry);
 
         // Speculative retry is disabled *OR*
         // 11980: Disable speculative retry if using EACH_QUORUM in order to 
prevent miscounting DC responses
         if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || 
consistencyLevel == ConsistencyLevel.EACH_QUORUM)
-            // TODO Looks like we might want to move speculation into the 
replica layout, but that might be a story for post-4.0
-            return new NeverSpeculatingReadExecutor(cfs, command, 
replicaLayout, queryStartNanoTime, false);
+            return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, 
queryStartNanoTime, false);
 
         // There are simply no extra replicas to speculate.
         // Handle this separately so it can record failed attempts to 
speculate due to lack of replicas
-        if (replicaLayout.selected().size() == replicaLayout.all().size())
+        if (replicaPlan.contacts().size() == replicaPlan.candidates().size())
         {
             boolean recordFailedSpeculation = consistencyLevel != 
ConsistencyLevel.ALL;
-            return new NeverSpeculatingReadExecutor(cfs, command, 
replicaLayout, queryStartNanoTime, recordFailedSpeculation);
+            return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, 
queryStartNanoTime, recordFailedSpeculation);
         }
 
-        // If CL.ALL, upgrade to AlwaysSpeculating;
-        // If We are going to contact every node anyway, ask for 2 full data 
requests instead of 1, for redundancy
-        // (same amount of requests in total, but we turn 1 digest request 
into a full blown data request)
         if (retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE))
-            return new AlwaysSpeculatingReadExecutor(cfs, command, 
replicaLayout, queryStartNanoTime);
+            return new AlwaysSpeculatingReadExecutor(cfs, command, 
replicaPlan, queryStartNanoTime);
         else // PERCENTILE or CUSTOM.
-            return new SpeculatingReadExecutor(cfs, command, replicaLayout, 
queryStartNanoTime);
+            return new SpeculatingReadExecutor(cfs, command, replicaPlan, 
queryStartNanoTime);
     }
 
     /**
@@ -223,9 +219,9 @@ public abstract class AbstractReadExecutor
         return !handler.await(cfs.sampleReadLatencyNanos, 
TimeUnit.NANOSECONDS);
     }
 
-    ReplicaLayout.ForToken replicaLayout()
+    ReplicaPlan.ForTokenRead replicaPlan()
     {
-        return replicaLayout;
+        return replicaPlan.get();
     }
 
     void onReadTimeout() {}
@@ -239,9 +235,9 @@ public abstract class AbstractReadExecutor
          */
         private final boolean logFailedSpeculation;
 
-        public NeverSpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand 
command, ReplicaLayout.ForToken replicaLayout, long queryStartNanoTime, boolean 
logFailedSpeculation)
+        public NeverSpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand 
command, ReplicaPlan.ForTokenRead replicaPlan, long queryStartNanoTime, boolean 
logFailedSpeculation)
         {
-            super(cfs, command, replicaLayout, 1, queryStartNanoTime);
+            super(cfs, command, replicaPlan, 1, queryStartNanoTime);
             this.logFailedSpeculation = logFailedSpeculation;
         }
 
@@ -260,13 +256,13 @@ public abstract class AbstractReadExecutor
 
         public SpeculatingReadExecutor(ColumnFamilyStore cfs,
                                        ReadCommand command,
-                                       ReplicaLayout.ForToken replicaLayout,
+                                       ReplicaPlan.ForTokenRead replicaPlan,
                                        long queryStartNanoTime)
         {
             // We're hitting additional targets for read repair (??).  Since 
our "extra" replica is the least-
             // preferred by the snitch, we do an extra data read to start with 
against a replica more
             // likely to reply; better to let RR fail than the entire query.
-            super(cfs, command, replicaLayout, replicaLayout.blockFor() < 
replicaLayout.selected().size() ? 2 : 1, queryStartNanoTime);
+            super(cfs, command, replicaPlan, replicaPlan.blockFor() < 
replicaPlan.contacts().size() ? 2 : 1, queryStartNanoTime);
         }
 
         public void maybeTryAdditionalReplicas()
@@ -277,12 +273,12 @@ public abstract class AbstractReadExecutor
                 cfs.metric.speculativeRetries.inc();
                 speculated = true;
 
+                ReplicaPlan.ForTokenRead replicaPlan = replicaPlan();
                 ReadCommand retryCommand = command;
                 Replica extraReplica;
                 if (handler.resolver.isDataPresent())
                 {
-                    extraReplica = tryFind(replicaLayout().all(),
-                            r -> 
!replicaLayout().selected().contains(r)).orNull();
+                    extraReplica = 
replicaPlan.firstUncontactedCandidate(Predicates.alwaysTrue());
 
                     // we should only use a SpeculatingReadExecutor if we have 
an extra replica to speculate against
                     assert extraReplica != null;
@@ -293,8 +289,7 @@ public abstract class AbstractReadExecutor
                 }
                 else
                 {
-                    extraReplica = tryFind(replicaLayout().all(),
-                            r -> r.isFull() && 
!replicaLayout().selected().contains(r)).orNull();
+                    extraReplica = 
replicaPlan.firstUncontactedCandidate(Replica::isFull);
                     if (extraReplica == null)
                     {
                         cfs.metric.speculativeInsufficientReplicas.inc();
@@ -304,6 +299,11 @@ public abstract class AbstractReadExecutor
                     }
                 }
 
+                // we must update the plan to include this new node, else when 
we come to read-repair, we may not include this
+                // speculated response in the data requests we make again, and 
we will not be able to 'speculate' an extra repair read,
+                // nor would we be able to speculate a new 'write' if the 
repair writes are insufficient
+                super.replicaPlan.addToContacts(extraReplica);
+
                 if (traceState != null)
                     traceState.trace("speculating read retry on {}", 
extraReplica);
                 logger.trace("speculating read retry on {}", extraReplica);
@@ -325,10 +325,12 @@ public abstract class AbstractReadExecutor
     {
         public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs,
                                              ReadCommand command,
-                                             ReplicaLayout.ForToken 
replicaLayout,
+                                             ReplicaPlan.ForTokenRead 
replicaPlan,
                                              long queryStartNanoTime)
         {
-            super(cfs, command, replicaLayout, replicaLayout.selected().size() 
> 1 ? 2 : 1, queryStartNanoTime);
+            // presumably, we speculate an extra data request here in case it 
is our data request that fails to respond,
+            // and there are no more nodes to consult
+            super(cfs, command, replicaPlan, replicaPlan.contacts().size() > 1 
? 2 : 1, queryStartNanoTime);
         }
 
         public void maybeTryAdditionalReplicas()
@@ -403,7 +405,7 @@ public abstract class AbstractReadExecutor
                 logger.trace("Timed out waiting on digest mismatch repair 
requests");
             // the caught exception here will have CL.ALL from the repair 
command,
             // not whatever CL the initial command was at (CASSANDRA-7947)
-            throw new ReadTimeoutException(replicaLayout().consistencyLevel(), 
handler.blockfor - 1, handler.blockfor, true);
+            throw new ReadTimeoutException(replicaPlan().consistencyLevel(), 
handler.blockFor - 1, handler.blockFor, true);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java 
b/src/java/org/apache/cassandra/service/reads/DataResolver.java
index a6901b2..db5f3c8 100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@ -42,7 +42,7 @@ import org.apache.cassandra.db.transform.Filter;
 import org.apache.cassandra.db.transform.FilteredPartitions;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.reads.repair.ReadRepair;
@@ -52,14 +52,14 @@ import 
org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
 import static com.google.common.collect.Iterables.*;
 import static 
org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener;
 
-public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, 
L>> extends ResponseResolver<E, L>
+public class DataResolver<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P>
 {
     private final boolean enforceStrictLiveness;
-    private final ReadRepair<E, L> readRepair;
+    private final ReadRepair<E, P> readRepair;
 
-    public DataResolver(ReadCommand command, L replicaLayout, ReadRepair<E, L> 
readRepair, long queryStartNanoTime)
+    public DataResolver(ReadCommand command, ReplicaPlan.Shared<E, P> 
replicaPlan, ReadRepair<E, P> readRepair, long queryStartNanoTime)
     {
-        super(command, replicaLayout, queryStartNanoTime);
+        super(command, replicaPlan, queryStartNanoTime);
         this.enforceStrictLiveness = 
command.metadata().enforceStrictLiveness();
         this.readRepair = readRepair;
     }
@@ -83,7 +83,7 @@ public class DataResolver<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>>
         Collection<MessageIn<ReadResponse>> messages = responses.snapshot();
         assert !any(messages, msg -> msg.payload.isDigestResponse());
 
-        E replicas = replicaLayout.all().keep(transform(messages, msg -> 
msg.from));
+        E replicas = replicaPlan().candidates().select(transform(messages, msg 
-> msg.from), false);
         List<UnfilteredPartitionIterator> iters = new ArrayList<>(
         Collections2.transform(messages, msg -> 
msg.payload.makeIterator(command)));
         assert replicas.size() == iters.size();
@@ -121,7 +121,7 @@ public class DataResolver<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>>
             command.limits().newCounter(command.nowInSec(), true, 
command.selectsFullPartition(), enforceStrictLiveness);
 
         UnfilteredPartitionIterator merged = 
mergeWithShortReadProtection(iters,
-                                                                          
replicaLayout.withSelected(replicas),
+                                                                          
replicaPlan.getWithContacts(replicas),
                                                                           
mergedResultCounter,
                                                                           
repairedDataTracker);
         FilteredPartitions filtered = FilteredPartitions.filter(merged, new 
Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
@@ -135,7 +135,7 @@ public class DataResolver<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>>
     }
 
     private UnfilteredPartitionIterator 
mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
-                                                                     L sources,
+                                                                     P sources,
                                                                      
DataLimits.Counter mergedResultCounter,
                                                                      
RepairedDataTracker repairedDataTracker)
     {
@@ -150,7 +150,7 @@ public class DataResolver<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>>
          */
         if (!command.limits().isUnlimited())
             for (int i = 0; i < results.size(); i++)
-                results.set(i, 
ShortReadProtection.extend(sources.selected().get(i), results.get(i), command, 
mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
+                results.set(i, 
ShortReadProtection.extend(sources.contacts().get(i), results.get(i), command, 
mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
 
         return UnfilteredPartitionIterators.merge(results, 
wrapMergeListener(readRepair.getMergeListener(sources), sources, 
repairedDataTracker));
     }
@@ -161,7 +161,7 @@ public class DataResolver<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>>
     }
 
     private UnfilteredPartitionIterators.MergeListener 
wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener,
-                                                                         L 
sources,
+                                                                         P 
sources,
                                                                          
RepairedDataTracker repairedDataTracker)
     {
         // Avoid wrapping no-op listeners as it doesn't throw
@@ -191,7 +191,7 @@ public class DataResolver<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>>
                                                            table,
                                                            mergedDeletion == 
null ? "null" : mergedDeletion.toString(),
                                                            '[' + Joiner.on(", 
").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : 
rt.toString())) + ']',
-                                                           sources.selected(),
+                                                           sources.contacts(),
                                                            
makeResponsesDebugString(partitionKey));
                             throw new AssertionError(details, e);
                         }
@@ -212,7 +212,7 @@ public class DataResolver<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>>
                                                            table,
                                                            merged == null ? 
"null" : merged.toString(table),
                                                            '[' + Joiner.on(", 
").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : 
rt.toString(table))) + ']',
-                                                           sources.selected(),
+                                                           sources.contacts(),
                                                            
makeResponsesDebugString(partitionKey));
                             throw new AssertionError(details, e);
                         }
@@ -238,7 +238,7 @@ public class DataResolver<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>>
                                                            table,
                                                            merged == null ? 
"null" : merged.toString(table),
                                                            '[' + Joiner.on(", 
").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : 
rt.toString(table))) + ']',
-                                                           sources.selected(),
+                                                           sources.contacts(),
                                                            
makeResponsesDebugString(partitionKey));
                             throw new AssertionError(details, e);
                         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java 
b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
index 28c2117..0dcae95 100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -29,10 +29,10 @@ import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.service.reads.repair.NoopReadRepair;
 import org.apache.cassandra.service.reads.repair.ReadRepair;
@@ -40,13 +40,13 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static com.google.common.collect.Iterables.any;
 
-public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, 
L>> extends ResponseResolver<E, L>
+public class DigestResolver<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P>
 {
     private volatile MessageIn<ReadResponse> dataResponse;
 
-    public DigestResolver(ReadCommand command, L replicas, long 
queryStartNanoTime)
+    public DigestResolver(ReadCommand command, ReplicaPlan.Shared<E, P> 
replicaPlan, long queryStartNanoTime)
     {
-        super(command, replicas, queryStartNanoTime);
+        super(command, replicaPlan, queryStartNanoTime);
         Preconditions.checkArgument(command instanceof 
SinglePartitionReadCommand,
                                     "DigestResolver can only be used with 
SinglePartitionReadCommand commands");
     }
@@ -55,7 +55,7 @@ public class DigestResolver<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L
     public void preprocess(MessageIn<ReadResponse> message)
     {
         super.preprocess(message);
-        Replica replica = replicaLayout.getReplicaFor(message.from);
+        Replica replica = replicaPlan().getReplicaFor(message.from);
         if (dataResponse == null && !message.payload.isDigestResponse() && 
replica.isFull())
         {
             dataResponse = message;
@@ -76,7 +76,7 @@ public class DigestResolver<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L
     {
         return any(responses,
                 msg -> !msg.payload.isDigestResponse()
-                        && 
replicaLayout.getReplicaFor(msg.from).isTransient());
+                        && 
replicaPlan().getReplicaFor(msg.from).isTransient());
     }
 
     public PartitionIterator getData()
@@ -93,16 +93,14 @@ public class DigestResolver<E extends Endpoints<E>, L 
extends ReplicaLayout<E, L
         {
             // This path can be triggered only if we've got responses from 
full replicas and they match, but
             // transient replica response still contains data, which needs to 
be reconciled.
-            DataResolver<E, L> dataResolver = new DataResolver<>(command,
-                                                                 replicaLayout,
-                                                                 
NoopReadRepair.instance,
-                                                                 
queryStartNanoTime);
+            DataResolver<E, P> dataResolver
+                    = new DataResolver<>(command, replicaPlan, 
NoopReadRepair.instance, queryStartNanoTime);
 
             dataResolver.preprocess(dataResponse);
             // Reconcile with transient replicas
             for (MessageIn<ReadResponse> response : responses)
             {
-                Replica replica = replicaLayout.getReplicaFor(response.from);
+                Replica replica = replicaPlan().getReplicaFor(response.from);
                 if (replica.isTransient())
                     dataResolver.preprocess(response);
             }
@@ -119,7 +117,7 @@ public class DigestResolver<E extends Endpoints<E>, L 
extends ReplicaLayout<E, L
         ByteBuffer digest = null;
         for (MessageIn<ReadResponse> message : responses.snapshot())
         {
-            if (replicaLayout.getReplicaFor(message.from).isTransient())
+            if (replicaPlan().getReplicaFor(message.from).isTransient())
                 continue;
 
             ByteBuffer newDigest = message.payload.digest(command);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java 
b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index 3d39377..7a2385c 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -23,19 +23,18 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.PartitionRangeReadCommand;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
@@ -44,16 +43,17 @@ import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
-public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, 
L>> implements IAsyncCallbackWithFailure<ReadResponse>
+public class ReadCallback<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<E>> implements IAsyncCallbackWithFailure<ReadResponse>
 {
     protected static final Logger logger = LoggerFactory.getLogger( 
ReadCallback.class );
 
     public final ResponseResolver resolver;
     final SimpleCondition condition = new SimpleCondition();
     private final long queryStartNanoTime;
-    // TODO: move to replica layout as well?
-    final int blockfor;
-    final L replicaLayout;
+    final int blockFor; // TODO: move to replica plan as well?
+    // this uses a plain reference, but is initialised before handoff to any 
other threads; the later updates
+    // may not be visible to the threads immediately, but ReplicaPlan only 
contains final fields, so they will never see an uninitialised object
+    final ReplicaPlan.Shared<E, P> replicaPlan;
     private final ReadCommand command;
     private static final AtomicIntegerFieldUpdater<ReadCallback> 
recievedUpdater
             = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, 
"received");
@@ -63,19 +63,24 @@ public class ReadCallback<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>>
     private volatile int failures = 0;
     private final Map<InetAddressAndPort, RequestFailureReason> 
failureReasonByEndpoint;
 
-    public ReadCallback(ResponseResolver resolver, int blockfor, ReadCommand 
command, L replicaLayout, long queryStartNanoTime)
+    public ReadCallback(ResponseResolver resolver, ReadCommand command, 
ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
     {
         this.command = command;
-        this.blockfor = blockfor;
         this.resolver = resolver;
         this.queryStartNanoTime = queryStartNanoTime;
-        this.replicaLayout = replicaLayout;
+        this.replicaPlan = replicaPlan;
+        this.blockFor = replicaPlan.get().blockFor();
         this.failureReasonByEndpoint = new ConcurrentHashMap<>();
         // we don't support read repair (or rapid read protection) for range 
scans yet (CASSANDRA-6897)
-        assert !(command instanceof PartitionRangeReadCommand) || blockfor >= 
replicaLayout.selected().size();
+        assert !(command instanceof PartitionRangeReadCommand) || blockFor >= 
replicaPlan().contacts().size();
 
         if (logger.isTraceEnabled())
-            logger.trace("Blockfor is {}; setting up requests to {}", 
blockfor, this.replicaLayout);
+            logger.trace("Blockfor is {}; setting up requests to {}", 
blockFor, this.replicaPlan);
+    }
+
+    protected P replicaPlan()
+    {
+        return replicaPlan.get();
     }
 
     public boolean await(long timePastStart, TimeUnit unit)
@@ -94,30 +99,30 @@ public class ReadCallback<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>>
     public void awaitResults() throws ReadFailureException, 
ReadTimeoutException
     {
         boolean signaled = await(command.getTimeout(), TimeUnit.MILLISECONDS);
-        boolean failed = failures > 0 && blockfor + failures > 
replicaLayout.selected().size();
+        boolean failed = failures > 0 && blockFor + failures > 
replicaPlan().contacts().size();
         if (signaled && !failed)
             return;
 
         if (Tracing.isTracing())
         {
             String gotData = received > 0 ? (resolver.isDataPresent() ? " 
(including data)" : " (only digests)") : "";
-            Tracing.trace("{}; received {} of {} responses{}", new Object[]{ 
(failed ? "Failed" : "Timed out"), received, blockfor, gotData });
+            Tracing.trace("{}; received {} of {} responses{}", new Object[]{ 
(failed ? "Failed" : "Timed out"), received, blockFor, gotData });
         }
         else if (logger.isDebugEnabled())
         {
             String gotData = received > 0 ? (resolver.isDataPresent() ? " 
(including data)" : " (only digests)") : "";
-            logger.debug("{}; received {} of {} responses{}", new Object[]{ 
(failed ? "Failed" : "Timed out"), received, blockfor, gotData });
+            logger.debug("{}; received {} of {} responses{}", new Object[]{ 
(failed ? "Failed" : "Timed out"), received, blockFor, gotData });
         }
 
         // Same as for writes, see AbstractWriteResponseHandler
         throw failed
-            ? new ReadFailureException(replicaLayout.consistencyLevel(), 
received, blockfor, resolver.isDataPresent(), failureReasonByEndpoint)
-            : new ReadTimeoutException(replicaLayout.consistencyLevel(), 
received, blockfor, resolver.isDataPresent());
+            ? new ReadFailureException(replicaPlan().consistencyLevel(), 
received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint)
+            : new ReadTimeoutException(replicaPlan().consistencyLevel(), 
received, blockFor, resolver.isDataPresent());
     }
 
     public int blockFor()
     {
-        return blockfor;
+        return blockFor;
     }
 
     public void response(MessageIn<ReadResponse> message)
@@ -127,24 +132,16 @@ public class ReadCallback<E extends Endpoints<E>, L 
extends ReplicaLayout<E, L>>
               ? recievedUpdater.incrementAndGet(this)
               : received;
 
-        if (n >= blockfor && resolver.isDataPresent())
+        if (n >= blockFor && resolver.isDataPresent())
             condition.signalAll();
     }
 
     /**
-     * @return true if the message counts towards the blockfor threshold
+     * @return true if the message counts towards the blockFor threshold
      */
     private boolean waitingFor(InetAddressAndPort from)
     {
-        return !replicaLayout.consistencyLevel().isDatacenterLocal() || 
DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
-    }
-
-    /**
-     * @return the current number of received responses
-     */
-    public int getReceivedCount()
-    {
-        return received;
+        return !replicaPlan().consistencyLevel().isDatacenterLocal() || 
DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
     }
 
     public void response(ReadResponse result)
@@ -157,11 +154,6 @@ public class ReadCallback<E extends Endpoints<E>, L 
extends ReplicaLayout<E, L>>
         response(message);
     }
 
-    public void assureSufficientLiveNodes() throws UnavailableException
-    {
-        
replicaLayout.consistencyLevel().assureSufficientLiveNodesForRead(replicaLayout.keyspace(),
 replicaLayout.selected());
-    }
-
     public boolean isLatencyForSnitch()
     {
         return true;
@@ -176,7 +168,7 @@ public class ReadCallback<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>>
 
         failureReasonByEndpoint.put(from, failureReason);
 
-        if (blockfor + n > replicaLayout.selected().size())
+        if (blockFor + n > replicaPlan().contacts().size())
             condition.signalAll();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java 
b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
index 298f843..0c1e1ba 100644
--- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
@@ -23,30 +23,34 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.utils.concurrent.Accumulator;
 
-public abstract class ResponseResolver<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>>
+public abstract class ResponseResolver<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<E>>
 {
     protected static final Logger logger = 
LoggerFactory.getLogger(ResponseResolver.class);
 
     protected final ReadCommand command;
-    protected final L replicaLayout;
+    protected final ReplicaPlan.Shared<E, P> replicaPlan;
 
     // Accumulator gives us non-blocking thread-safety with optimal 
algorithmic constraints
     protected final Accumulator<MessageIn<ReadResponse>> responses;
     protected final long queryStartNanoTime;
 
-    public ResponseResolver(ReadCommand command, L replicaLayout, long 
queryStartNanoTime)
+    public ResponseResolver(ReadCommand command, ReplicaPlan.Shared<E, P> 
replicaPlan, long queryStartNanoTime)
     {
         this.command = command;
-        this.replicaLayout = replicaLayout;
-        // TODO: calculate max possible replicas for the query (e.g. local dc 
queries won't contact remotes)
-        this.responses = new Accumulator<>(replicaLayout.all().size());
+        this.replicaPlan = replicaPlan;
+        this.responses = new 
Accumulator<>(replicaPlan.get().candidates().size());
         this.queryStartNanoTime = queryStartNanoTime;
     }
 
+    protected P replicaPlan()
+    {
+        return replicaPlan.get();
+    }
+
     public abstract boolean isDataPresent();
 
     public void preprocess(MessageIn<ReadResponse> message)
@@ -57,8 +61,8 @@ public abstract class ResponseResolver<E extends 
Endpoints<E>, L extends Replica
         }
         catch (IllegalStateException e)
         {
-            logger.error("Encountered error while trying to preprocess the 
message {}, in command {}, replica layout: {}",
-                         message, command, replicaLayout);
+            logger.error("Encountered error while trying to preprocess the 
message {}, in command {}, replica plan: {}",
+                         message, command, replicaPlan);
             throw e;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
 
b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
index 580b790..b16d105 100644
--- 
a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
+++ 
b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
@@ -19,6 +19,8 @@
 package org.apache.cassandra.service.reads;
 
 import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +42,6 @@ import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.ExcludingBounds;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.reads.repair.NoopReadRepair;
@@ -87,10 +88,11 @@ public class ShortReadPartitionsProtection extends 
Transformation<UnfilteredRowI
          * If we don't apply the transformation *after* extending the 
partition with MoreRows,
          * applyToRow() method of protection will not be called on the first 
row of the new extension iterator.
          */
-        ReplicaLayout.ForToken replicaLayout = 
ReplicaLayout.forSingleReplica(Keyspace.open(command.metadata().keyspace), 
partition.partitionKey().getToken(), source);
+        ReplicaPlan.ForTokenRead replicaPlan = 
ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), 
partition.partitionKey().getToken(), source);
+        ReplicaPlan.SharedForTokenRead sharedReplicaPlan = 
ReplicaPlan.shared(replicaPlan);
         ShortReadRowsProtection protection = new 
ShortReadRowsProtection(partition.partitionKey(),
                                                                          
command, source,
-                                                                         (cmd) 
-> executeReadCommand(cmd, replicaLayout),
+                                                                         (cmd) 
-> executeReadCommand(cmd, sharedReplicaPlan),
                                                                          
singleResultCounter,
                                                                          
mergedResultCounter);
         return Transformation.apply(MoreRows.extend(partition, protection), 
protection);
@@ -169,14 +171,15 @@ public class ShortReadPartitionsProtection extends 
Transformation<UnfilteredRowI
                                                       : new 
ExcludingBounds<>(lastPartitionKey, bounds.right);
         DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
 
-        ReplicaLayout.ForRange replicaLayout = 
ReplicaLayout.forSingleReplica(Keyspace.open(command.metadata().keyspace), 
cmd.dataRange().keyRange(), source);
-        return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, 
newDataRange), replicaLayout);
+        ReplicaPlan.ForRangeRead replicaPlan = 
ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), 
cmd.dataRange().keyRange(), source);
+        return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, 
newDataRange), ReplicaPlan.shared(replicaPlan));
     }
 
-    private <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> 
UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, L replicaLayout)
+    private <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+    UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, 
ReplicaPlan.Shared<E, P> replicaPlan)
     {
-        DataResolver<E, L> resolver = new DataResolver<>(cmd, replicaLayout, 
(NoopReadRepair<E, L>)NoopReadRepair.instance, queryStartNanoTime);
-        ReadCallback<E, L> handler = new ReadCallback<>(resolver, 
replicaLayout.consistencyLevel().blockFor(replicaLayout.keyspace()), cmd, 
replicaLayout, queryStartNanoTime);
+        DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, 
(NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime);
+        ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, 
replicaPlan, queryStartNanoTime);
 
         if (source.isLocal())
             StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(cmd, handler));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java 
b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index 528d31b..1b213ff 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -24,6 +24,7 @@ import java.util.function.Consumer;
 import com.google.common.base.Preconditions;
 
 import com.codahale.metrics.Meter;
+import com.google.common.base.Predicates;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
@@ -34,7 +35,7 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -44,11 +45,12 @@ import org.apache.cassandra.service.reads.DigestResolver;
 import org.apache.cassandra.service.reads.ReadCallback;
 import org.apache.cassandra.tracing.Tracing;
 
-public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>> implements ReadRepair<E, L>
+public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<E>>
+        implements ReadRepair<E, P>
 {
     protected final ReadCommand command;
     protected final long queryStartNanoTime;
-    protected final L replicaLayout;
+    protected final ReplicaPlan.Shared<E, P> replicaPlan;
     protected final ColumnFamilyStore cfs;
 
     private volatile DigestRepair digestRepair = null;
@@ -68,15 +70,20 @@ public abstract class AbstractReadRepair<E extends 
Endpoints<E>, L extends Repli
     }
 
     public AbstractReadRepair(ReadCommand command,
-                              L replicaLayout,
+                              ReplicaPlan.Shared<E, P> replicaPlan,
                               long queryStartNanoTime)
     {
         this.command = command;
         this.queryStartNanoTime = queryStartNanoTime;
-        this.replicaLayout = replicaLayout;
+        this.replicaPlan = replicaPlan;
         this.cfs = Keyspace.openAndGetStore(command.metadata());
     }
 
+    protected P replicaPlan()
+    {
+        return replicaPlan.get();
+    }
+
     void sendReadCommand(Replica to, ReadCallback readCallback)
     {
         MessageOut<ReadCommand> message = command.createMessage();
@@ -90,14 +97,13 @@ public abstract class AbstractReadRepair<E extends 
Endpoints<E>, L extends Repli
     abstract Meter getRepairMeter();
 
     // digestResolver isn't used here because we resend read requests to all 
participants
-    public void startRepair(DigestResolver<E, L> digestResolver, 
Consumer<PartitionIterator> resultConsumer)
+    public void startRepair(DigestResolver<E, P> digestResolver, 
Consumer<PartitionIterator> resultConsumer)
     {
         getRepairMeter().mark();
 
         // Do a full data read to resolve the correct response (and repair 
node that need be)
-        DataResolver<E, L> resolver = new DataResolver<>(command, 
replicaLayout, this, queryStartNanoTime);
-        ReadCallback<E, L> readCallback = new ReadCallback<>(resolver, 
replicaLayout.consistencyLevel().blockFor(cfs.keyspace),
-                                                             command, 
replicaLayout, queryStartNanoTime);
+        DataResolver<E, P> resolver = new DataResolver<>(command, replicaPlan, 
this, queryStartNanoTime);
+        ReadCallback<E, P> readCallback = new ReadCallback<>(resolver, 
command, replicaPlan, queryStartNanoTime);
 
         digestRepair = new DigestRepair(resolver, readCallback, 
resultConsumer);
 
@@ -105,12 +111,12 @@ public abstract class AbstractReadRepair<E extends 
Endpoints<E>, L extends Repli
         if 
(DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled())
             command.trackRepairedStatus();
 
-        for (Replica replica : replicaLayout.selected())
+        for (Replica replica : replicaPlan().contacts())
         {
             Tracing.trace("Enqueuing full data read to {}", replica);
             sendReadCommand(replica, readCallback);
         }
-        ReadRepairDiagnostics.startRepair(this, replicaLayout, digestResolver);
+        ReadRepairDiagnostics.startRepair(this, replicaPlan(), digestResolver);
     }
 
     public void awaitReads() throws ReadTimeoutException
@@ -125,7 +131,7 @@ public abstract class AbstractReadRepair<E extends 
Endpoints<E>, L extends Repli
 
     private boolean shouldSpeculate()
     {
-        ConsistencyLevel consistency = replicaLayout.consistencyLevel();
+        ConsistencyLevel consistency = replicaPlan().consistencyLevel();
         ConsistencyLevel speculativeCL = consistency.isDatacenterLocal() ? 
ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
         return  consistency != ConsistencyLevel.EACH_QUORUM
                 && consistency.satisfies(speculativeCL, cfs.keyspace)
@@ -142,15 +148,15 @@ public abstract class AbstractReadRepair<E extends 
Endpoints<E>, L extends Repli
 
         if (shouldSpeculate() && 
!repair.readCallback.await(cfs.sampleReadLatencyNanos, TimeUnit.NANOSECONDS))
         {
-            L uncontacted = replicaLayout.forNaturalUncontacted();
-            if (uncontacted.selected().isEmpty())
+            Replica uncontacted = 
replicaPlan().firstUncontactedCandidate(Predicates.alwaysTrue());
+            if (uncontacted == null)
                 return;
 
-            Replica replica = uncontacted.selected().iterator().next();
-            Tracing.trace("Enqueuing speculative full data read to {}", 
replica);
-            sendReadCommand(replica, repair.readCallback);
+            replicaPlan.addToContacts(uncontacted);
+            Tracing.trace("Enqueuing speculative full data read to {}", 
uncontacted);
+            sendReadCommand(uncontacted, repair.readCallback);
             ReadRepairMetrics.speculatedRead.mark();
-            ReadRepairDiagnostics.speculatedRead(this, replica.endpoint(), 
uncontacted);
+            ReadRepairDiagnostics.speculatedRead(this, uncontacted.endpoint(), 
replicaPlan());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
 
b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
index 54af2cf..f536ea8 100644
--- 
a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
@@ -37,9 +37,9 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.locator.Replicas;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.IAsyncCallback;
@@ -49,25 +49,26 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.tracing.Tracing;
 
-public class BlockingPartitionRepair<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>> extends AbstractFuture<Object> implements 
IAsyncCallback<Object>
+public class BlockingPartitionRepair<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<E>>
+        extends AbstractFuture<Object> implements IAsyncCallback<Object>
 {
     private final DecoratedKey key;
-    private final L replicaLayout;
+    private final P replicaPlan;
     private final Map<Replica, Mutation> pendingRepairs;
     private final CountDownLatch latch;
 
     private volatile long mutationsSentTime;
 
-    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> 
repairs, int maxBlockFor, L replicaLayout)
+    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> 
repairs, int maxBlockFor, P replicaPlan)
     {
         this.key = key;
         this.pendingRepairs = new ConcurrentHashMap<>(repairs);
-        this.replicaLayout = replicaLayout;
+        this.replicaPlan = replicaPlan;
 
         // here we remove empty repair mutations from the block for total, 
since
         // we're not sending them mutations
         int blockFor = maxBlockFor;
-        for (Replica participant: replicaLayout.selected())
+        for (Replica participant: replicaPlan.contacts())
         {
             // remote dcs can sometimes get involved in dc-local reads. We 
want to repair
             // them if they do, but they shouldn't interfere with blocking the 
client read.
@@ -97,7 +98,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, 
L extends ReplicaLa
 
     private boolean shouldBlockOn(InetAddressAndPort endpoint)
     {
-        return !replicaLayout.consistencyLevel().isDatacenterLocal() || 
isLocal(endpoint);
+        return !replicaPlan.consistencyLevel().isDatacenterLocal() || 
isLocal(endpoint);
     }
 
     @VisibleForTesting
@@ -105,7 +106,7 @@ public class BlockingPartitionRepair<E extends 
Endpoints<E>, L extends ReplicaLa
     {
         if (shouldBlockOn(from))
         {
-            pendingRepairs.remove(replicaLayout.getReplicaFor(from));
+            pendingRepairs.remove(replicaPlan.getReplicaFor(from));
             latch.countDown();
         }
     }
@@ -198,8 +199,8 @@ public class BlockingPartitionRepair<E extends 
Endpoints<E>, L extends ReplicaLa
         if (awaitRepairs(timeout, timeoutUnit))
             return;
 
-        L newCandidates = replicaLayout.forNaturalUncontacted();
-        if (newCandidates.selected().isEmpty())
+        E newCandidates = replicaPlan.uncontactedCandidates();
+        if (newCandidates.isEmpty())
             return;
 
         PartitionUpdate update = mergeUnackedUpdates();
@@ -212,7 +213,7 @@ public class BlockingPartitionRepair<E extends 
Endpoints<E>, L extends ReplicaLa
 
         Mutation[] versionedMutations = new 
Mutation[msgVersionIdx(MessagingService.current_version) + 1];
 
-        for (Replica replica : newCandidates.selected())
+        for (Replica replica : newCandidates)
         {
             int versionIdx = 
msgVersionIdx(MessagingService.instance().getVersion(replica.endpoint()));
 
@@ -220,7 +221,7 @@ public class BlockingPartitionRepair<E extends 
Endpoints<E>, L extends ReplicaLa
 
             if (mutation == null)
             {
-                mutation = BlockingReadRepairs.createRepairMutation(update, 
replicaLayout.consistencyLevel(), replica.endpoint(), true);
+                mutation = BlockingReadRepairs.createRepairMutation(update, 
replicaPlan.consistencyLevel(), replica.endpoint(), true);
                 versionedMutations[versionIdx] = mutation;
             }
 
@@ -239,7 +240,7 @@ public class BlockingPartitionRepair<E extends 
Endpoints<E>, L extends ReplicaLa
 
     Keyspace getKeyspace()
     {
-        return replicaLayout.keyspace();
+        return replicaPlan.keyspace();
     }
 
     DecoratedKey getKey()
@@ -249,6 +250,6 @@ public class BlockingPartitionRepair<E extends 
Endpoints<E>, L extends ReplicaLa
 
     ConsistencyLevel getConsistency()
     {
-        return replicaLayout.consistencyLevel();
+        return replicaPlan.consistencyLevel();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java 
b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
index 402aed0..938abaf 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.locator.Endpoints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,8 +33,9 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.tracing.Tracing;
 
@@ -44,22 +44,23 @@ import org.apache.cassandra.tracing.Tracing;
  *  updates have been written to nodes needing correction. Breaks write
  *  atomicity in some situations
  */
-public class BlockingReadRepair<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>> extends AbstractReadRepair<E, L>
+public class BlockingReadRepair<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<E>>
+        extends AbstractReadRepair<E, P>
 {
     private static final Logger logger = 
LoggerFactory.getLogger(BlockingReadRepair.class);
 
     protected final Queue<BlockingPartitionRepair> repairs = new 
ConcurrentLinkedQueue<>();
     private final int blockFor;
 
-    BlockingReadRepair(ReadCommand command, L replicaLayout, long 
queryStartNanoTime)
+    BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> 
replicaPlan, long queryStartNanoTime)
     {
-        super(command, replicaLayout, queryStartNanoTime);
-        this.blockFor = 
replicaLayout.consistencyLevel().blockFor(cfs.keyspace);
+        super(command, replicaPlan, queryStartNanoTime);
+        this.blockFor = 
replicaPlan().consistencyLevel().blockFor(cfs.keyspace);
     }
 
-    public UnfilteredPartitionIterators.MergeListener getMergeListener(L 
replicaLayout)
+    public UnfilteredPartitionIterators.MergeListener getMergeListener(P 
replicaPlan)
     {
-        return new PartitionIteratorMergeListener(replicaLayout, command, 
this.replicaLayout.consistencyLevel(), this);
+        return new PartitionIteratorMergeListener<>(replicaPlan, command, 
this);
     }
 
     @Override
@@ -91,20 +92,20 @@ public class BlockingReadRepair<E extends Endpoints<E>, L 
extends ReplicaLayout<
         if (timedOut)
         {
             // We got all responses, but timed out while repairing
-            int blockFor = 
replicaLayout.consistencyLevel().blockFor(cfs.keyspace);
+            int blockFor = replicaPlan().blockFor();
             if (Tracing.isTracing())
                 Tracing.trace("Timed out while read-repairing after receiving 
all {} data and digest responses", blockFor);
             else
                 logger.debug("Timeout while read-repairing after receiving all 
{} data and digest responses", blockFor);
 
-            throw new ReadTimeoutException(replicaLayout.consistencyLevel(), 
blockFor - 1, blockFor, true);
+            throw new ReadTimeoutException(replicaPlan().consistencyLevel(), 
blockFor - 1, blockFor, true);
         }
     }
 
     @Override
-    public void repairPartition(DecoratedKey partitionKey, Map<Replica, 
Mutation> mutations, L replicaLayout)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, 
Mutation> mutations, P replicaPlan)
     {
-        BlockingPartitionRepair<E, L> blockingRepair = new 
BlockingPartitionRepair<>(partitionKey, mutations, blockFor, replicaLayout);
+        BlockingPartitionRepair<E, P> blockingRepair = new 
BlockingPartitionRepair<>(partitionKey, mutations, blockFor, replicaPlan);
         blockingRepair.sendInitialRepairs();
         repairs.add(blockingRepair);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java 
b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
index 4af4a92..6aa6ece 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
@@ -26,28 +26,29 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.service.reads.DigestResolver;
 
 /**
  * Bypasses the read repair path for short read protection and testing
  */
-public class NoopReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, 
L>> implements ReadRepair<E, L>
+public class NoopReadRepair<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<E>>
+        implements ReadRepair<E, P>
 {
     public static final NoopReadRepair instance = new NoopReadRepair();
 
     private NoopReadRepair() {}
 
     @Override
-    public UnfilteredPartitionIterators.MergeListener getMergeListener(L 
replicas)
+    public UnfilteredPartitionIterators.MergeListener getMergeListener(P 
replicas)
     {
         return UnfilteredPartitionIterators.MergeListener.NOOP;
     }
 
     @Override
-    public void startRepair(DigestResolver<E, L> digestResolver, 
Consumer<PartitionIterator> resultConsumer)
+    public void startRepair(DigestResolver<E, P> digestResolver, 
Consumer<PartitionIterator> resultConsumer)
     {
         resultConsumer.accept(digestResolver.getData());
     }
@@ -75,7 +76,7 @@ public class NoopReadRepair<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L
     }
 
     @Override
-    public void repairPartition(DecoratedKey partitionKey, Map<Replica, 
Mutation> mutations, L replicaLayout)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, 
Mutation> mutations, P replicaPlan)
     {
 
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
 
b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
index 4cae3ae..7247704 100644
--- 
a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
@@ -28,26 +28,26 @@ import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaPlan;
 
-public class PartitionIteratorMergeListener implements 
UnfilteredPartitionIterators.MergeListener
+public class PartitionIteratorMergeListener<E extends Endpoints<E>>
+        implements UnfilteredPartitionIterators.MergeListener
 {
-    private final ReplicaLayout replicaLayout;
+    private final ReplicaPlan.ForRead<E> replicaPlan;
     private final ReadCommand command;
-    private final ConsistencyLevel consistency;
     private final ReadRepair readRepair;
 
-    public PartitionIteratorMergeListener(ReplicaLayout replicaLayout, 
ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair)
+    public PartitionIteratorMergeListener(ReplicaPlan.ForRead<E> replicaPlan, 
ReadCommand command, ReadRepair readRepair)
     {
-        this.replicaLayout = replicaLayout;
+        this.replicaPlan = replicaPlan;
         this.command = command;
-        this.consistency = consistency;
         this.readRepair = readRepair;
     }
 
     public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
     {
-        return new RowIteratorMergeListener(partitionKey, columns(versions), 
isReversed(versions), replicaLayout, command, consistency, readRepair);
+        return new RowIteratorMergeListener<>(partitionKey, columns(versions), 
isReversed(versions), replicaPlan, command, readRepair);
     }
 
     protected RegularAndStaticColumns columns(List<UnfilteredRowIterator> 
versions)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java 
b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
index c13e2d6..64bfec2 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
@@ -27,22 +27,23 @@ import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 
 /**
  * Only performs the collection of data responses and reconciliation of them, 
doesn't send repair mutations
  * to replicas. This preserves write atomicity, but doesn't provide monotonic 
quorum reads
  */
-public class ReadOnlyReadRepair<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>> extends AbstractReadRepair<E, L>
+public class ReadOnlyReadRepair<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<E>>
+        extends AbstractReadRepair<E, P>
 {
-    ReadOnlyReadRepair(ReadCommand command, L replicaLayout, long 
queryStartNanoTime)
+    ReadOnlyReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> 
replicaPlan, long queryStartNanoTime)
     {
-        super(command, replicaLayout, queryStartNanoTime);
+        super(command, replicaPlan, queryStartNanoTime);
     }
 
     @Override
-    public UnfilteredPartitionIterators.MergeListener getMergeListener(L 
replicaLayout)
+    public UnfilteredPartitionIterators.MergeListener getMergeListener(P 
replicaPlan)
     {
         return UnfilteredPartitionIterators.MergeListener.NOOP;
     }
@@ -60,7 +61,7 @@ public class ReadOnlyReadRepair<E extends Endpoints<E>, L 
extends ReplicaLayout<
     }
 
     @Override
-    public void repairPartition(DecoratedKey partitionKey, Map<Replica, 
Mutation> mutations, L replicaLayout)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, 
Mutation> mutations, P replicaPlan)
     {
         throw new UnsupportedOperationException("ReadOnlyReadRepair shouldn't 
be trying to repair partitions");
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java 
b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
index 168f003..9441945 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.service.reads.repair;
 
 import java.util.Map;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.locator.Endpoints;
@@ -29,17 +30,19 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.service.reads.DigestResolver;
 
-public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, 
L>>
+public interface ReadRepair<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<E>>
 {
     public interface Factory
     {
-        <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, 
L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime);
+        <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+        ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> 
replicaPlan, long queryStartNanoTime);
     }
 
-    static <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> 
ReadRepair<E, L> create(ReadCommand command, L replicaPlan, long 
queryStartNanoTime)
+    static <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+    ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> 
replicaPlan, long queryStartNanoTime)
     {
         return command.metadata().params.readRepair.create(command, 
replicaPlan, queryStartNanoTime);
     }
@@ -47,7 +50,7 @@ public interface ReadRepair<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L
     /**
      * Used by DataResolver to generate corrections as the partition iterator 
is consumed
      */
-    UnfilteredPartitionIterators.MergeListener getMergeListener(L 
replicaLayout);
+    UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan);
 
     /**
      * Called when the digests from the initial read don't match. Reads may 
block on the
@@ -55,7 +58,7 @@ public interface ReadRepair<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L
      * @param digestResolver supplied so we can get the original data response
      * @param resultConsumer hook for the repair to set it's result on 
completion
      */
-    public void startRepair(DigestResolver<E, L> digestResolver, 
Consumer<PartitionIterator> resultConsumer);
+    public void startRepair(DigestResolver<E, P> digestResolver, 
Consumer<PartitionIterator> resultConsumer);
 
     /**
      * Block on the reads (or timeout) sent out in {@link 
ReadRepair#startRepair}
@@ -90,5 +93,5 @@ public interface ReadRepair<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L
      * Repairs a partition _after_ receiving data responses. This method 
receives replica list, since
      * we will block repair only on the replicas that have responded.
      */
-    void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> 
mutations, L replicaLayout);
+    void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> 
mutations, P replicaPlan);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java 
b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
index 4c74a89..b9167bd 100644
--- 
a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.diag.DiagnosticEventService;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.service.reads.DigestResolver;
 import 
org.apache.cassandra.service.reads.repair.PartitionRepairEvent.PartitionRepairEventType;
 import 
org.apache.cassandra.service.reads.repair.ReadRepairEvent.ReadRepairEventType;
@@ -38,22 +39,22 @@ final class ReadRepairDiagnostics
     {
     }
 
-    static void startRepair(AbstractReadRepair readRepair, ReplicaLayout<?, ?> 
layout, DigestResolver digestResolver)
+    static void startRepair(AbstractReadRepair readRepair, 
ReplicaPlan.ForRead<?> fullPlan, DigestResolver digestResolver)
     {
         if (service.isEnabled(ReadRepairEvent.class, 
ReadRepairEventType.START_REPAIR))
             service.publish(new 
ReadRepairEvent(ReadRepairEventType.START_REPAIR,
                                                 readRepair,
-                                                layout.selected().endpoints(),
-                                                layout.all().endpoints(), 
digestResolver));
+                                                
fullPlan.contacts().endpoints(),
+                                                
fullPlan.candidates().endpoints(), digestResolver));
     }
 
     static void speculatedRead(AbstractReadRepair readRepair, 
InetAddressAndPort endpoint,
-                               ReplicaLayout<?, ?> replicaLayout)
+                               ReplicaPlan.ForRead<?> fullPlan)
     {
         if (service.isEnabled(ReadRepairEvent.class, 
ReadRepairEventType.SPECULATED_READ))
             service.publish(new 
ReadRepairEvent(ReadRepairEventType.SPECULATED_READ,
                                                 readRepair, 
Collections.singletonList(endpoint),
-                                                
Lists.newArrayList(replicaLayout.all().endpoints()), null));
+                                                
Lists.newArrayList(fullPlan.candidates().endpoints()), null));
     }
 
     static void sendInitialRepair(BlockingPartitionRepair partitionRepair, 
InetAddressAndPort destination, Mutation mutation)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java 
b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
index 9e14362..5cec802 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
@@ -67,7 +67,7 @@ final class ReadRepairEvent extends DiagnosticEvent
         this.keyspace = readRepair.cfs.keyspace;
         this.tableName = readRepair.cfs.getTableName();
         this.cqlCommand = readRepair.command.toCQLString();
-        this.consistency = readRepair.replicaLayout.consistencyLevel();
+        this.consistency = readRepair.replicaPlan().consistencyLevel();
         this.speculativeRetry = 
readRepair.cfs.metadata().params.speculativeRetry.kind();
         this.destinations = destinations;
         this.allEndpoints = allEndpoints;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java 
b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
index 28c0e9e..7a4b795 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
@@ -21,22 +21,25 @@ package org.apache.cassandra.service.reads.repair;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 
 public enum ReadRepairStrategy implements ReadRepair.Factory
 {
     NONE
     {
-        public <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> 
ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long 
queryStartNanoTime)
+        public <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+        ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> 
replicaPlan, long queryStartNanoTime)
         {
-            return new ReadOnlyReadRepair<>(command, replicaLayout, 
queryStartNanoTime);
+            return new ReadOnlyReadRepair<>(command, replicaPlan, 
queryStartNanoTime);
         }
     },
 
     BLOCKING
     {
-        public <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> 
ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long 
queryStartNanoTime)
+        public <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+        ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> 
replicaPlan, long queryStartNanoTime)
         {
-            return new BlockingReadRepair<>(command, replicaLayout, 
queryStartNanoTime);
+            return new BlockingReadRepair<>(command, replicaPlan, 
queryStartNanoTime);
         }
     };
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
 
b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
index 7fe797a..60e0d41 100644
--- 
a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -46,21 +46,21 @@ import org.apache.cassandra.db.rows.Rows;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.schema.ColumnMetadata;
 
-public class RowIteratorMergeListener implements 
UnfilteredRowIterators.MergeListener
+public class RowIteratorMergeListener<E extends Endpoints<E>>
+        implements UnfilteredRowIterators.MergeListener
 {
     private final DecoratedKey partitionKey;
     private final RegularAndStaticColumns columns;
     private final boolean isReversed;
     private final ReadCommand command;
-    private final ConsistencyLevel consistency;
 
     private final PartitionUpdate.Builder[] repairs;
     private final Row.Builder[] currentRows;
     private final RowDiffListener diffListener;
-    private final ReplicaLayout layout;
+    private final ReplicaPlan.ForRead<E> replicaPlan;
 
     // The partition level deletion for the merge row.
     private DeletionTime partitionLevelDeletion;
@@ -73,19 +73,18 @@ public class RowIteratorMergeListener implements 
UnfilteredRowIterators.MergeLis
 
     private final ReadRepair readRepair;
 
-    public RowIteratorMergeListener(DecoratedKey partitionKey, 
RegularAndStaticColumns columns, boolean isReversed, ReplicaLayout layout, 
ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair)
+    public RowIteratorMergeListener(DecoratedKey partitionKey, 
RegularAndStaticColumns columns, boolean isReversed, ReplicaPlan.ForRead<E> 
replicaPlan, ReadCommand command, ReadRepair readRepair)
     {
         this.partitionKey = partitionKey;
         this.columns = columns;
         this.isReversed = isReversed;
-        this.layout = layout;
-        int size = layout.selected().size();
+        this.replicaPlan = replicaPlan;
+        int size = replicaPlan.contacts().size();
         repairs = new PartitionUpdate.Builder[size];
         currentRows = new Row.Builder[size];
         sourceDeletionTime = new DeletionTime[size];
         markerToRepair = new ClusteringBound[size];
         this.command = command;
-        this.consistency = consistency;
         this.readRepair = readRepair;
 
         this.diffListener = new RowDiffListener()
@@ -310,7 +309,7 @@ public class RowIteratorMergeListener implements 
UnfilteredRowIterators.MergeLis
     public void close()
     {
         Map<Replica, Mutation> mutations = null;
-        Endpoints<?> sources = layout.selected();
+        Endpoints<?> sources = replicaPlan.contacts();
         for (int i = 0; i < repairs.length; i++)
         {
             if (repairs[i] == null)
@@ -318,7 +317,7 @@ public class RowIteratorMergeListener implements 
UnfilteredRowIterators.MergeLis
 
             Replica source = sources.get(i);
 
-            Mutation mutation = 
BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, 
source.endpoint(), false);
+            Mutation mutation = 
BlockingReadRepairs.createRepairMutation(repairs[i].build(), 
replicaPlan.consistencyLevel(), source.endpoint(), false);
             if (mutation == null)
                 continue;
 
@@ -330,7 +329,7 @@ public class RowIteratorMergeListener implements 
UnfilteredRowIterators.MergeLis
 
         if (mutations != null)
         {
-            readRepair.repairPartition(partitionKey, mutations, layout);
+            readRepair.repairPartition(partitionKey, mutations, replicaPlan);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java 
b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
index 66eff23..f937f96 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
@@ -22,7 +22,6 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -34,13 +33,14 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.net.UnknownHostException;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import static com.google.common.collect.Iterables.*;
+import static com.google.common.collect.Iterables.filter;
 import static org.apache.cassandra.locator.Replica.fullReplica;
 import static org.apache.cassandra.locator.Replica.transientReplica;
 
@@ -111,7 +111,7 @@ public class ReplicaCollectionTest
 
         void testEquals()
         {
-            Assert.assertTrue(Iterables.elementsEqual(canonicalList, test));
+            Assert.assertTrue(elementsEqual(canonicalList, test));
         }
 
         void testEndpoints()
@@ -144,28 +144,6 @@ public class ReplicaCollectionTest
             Assert.assertEquals(new 
LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), 
test.endpoints());
         }
 
-        void testSelect(int subListDepth, int filterDepth, int sortDepth, int 
selectDepth)
-        {
-            TestCase<C> allMatchZeroCapacity = new 
TestCase<>(test.select().add(Predicates.alwaysTrue(), 0).get(), 
Collections.emptyList());
-            allMatchZeroCapacity.testAll(subListDepth, filterDepth, sortDepth, 
selectDepth - 1);
-
-            TestCase<C> noMatchFullCapacity = new 
TestCase<>(test.select().add(Predicates.alwaysFalse(), 
canonicalList.size()).get(), Collections.emptyList());
-            noMatchFullCapacity.testAll(subListDepth, filterDepth, 
sortDepth,selectDepth - 1);
-
-            if (canonicalList.size() <= 2)
-                return;
-
-            List<Replica> newOrderList = 
ImmutableList.of(canonicalList.get(2), canonicalList.get(1), 
canonicalList.get(0));
-            TestCase<C> newOrder = new TestCase<>(
-                    test.select()
-                            .add(r -> r == newOrderList.get(0), 3)
-                            .add(r -> r == newOrderList.get(1), 3)
-                            .add(r -> r == newOrderList.get(2), 3)
-                            .get(), newOrderList
-            );
-            newOrder.testAll(subListDepth, filterDepth, sortDepth,selectDepth 
- 1);
-        }
-
         private void assertSubList(C subCollection, int from, int to)
         {
             Assert.assertTrue(subCollection.isSnapshot);
@@ -182,7 +160,7 @@ public class ReplicaCollectionTest
             }
         }
 
-        void testSubList(int subListDepth, int filterDepth, int sortDepth, int 
selectDepth)
+        void testSubList(int subListDepth, int filterDepth, int sortDepth)
         {
             if (test.isSnapshot)
                 Assert.assertSame(test, test.subList(0, test.size()));
@@ -192,34 +170,62 @@ public class ReplicaCollectionTest
 
             TestCase<C> skipFront = new TestCase<>(test.subList(1, 
test.size()), canonicalList.subList(1, canonicalList.size()));
             assertSubList(skipFront.test, 1, canonicalList.size());
-            skipFront.testAll(subListDepth - 1, filterDepth, sortDepth, 
selectDepth);
+            skipFront.testAll(subListDepth - 1, filterDepth, sortDepth);
             TestCase<C> skipBack = new TestCase<>(test.subList(0, test.size() 
- 1), canonicalList.subList(0, canonicalList.size() - 1));
             assertSubList(skipBack.test, 0, canonicalList.size() - 1);
-            skipBack.testAll(subListDepth - 1, filterDepth, sortDepth, 
selectDepth);
+            skipBack.testAll(subListDepth - 1, filterDepth, sortDepth);
         }
 
-        void testFilter(int subListDepth, int filterDepth, int sortDepth, int 
selectDepth)
+        void testFilter(int subListDepth, int filterDepth, int sortDepth)
         {
             if (test.isSnapshot)
                 Assert.assertSame(test, test.filter(Predicates.alwaysTrue()));
 
             if (test.isEmpty())
                 return;
+
             // remove start
             // we recurse on the same subset in testSubList, so just 
corroborate we have the correct list here
-            assertSubList(test.filter(r -> r != canonicalList.get(0)), 1, 
canonicalList.size());
+            {
+                Predicate<Replica> removeFirst = r -> r != 
canonicalList.get(0);
+                assertSubList(test.filter(removeFirst), 1, 
canonicalList.size());
+                assertSubList(test.filter(removeFirst, 1), 1, 
Math.min(canonicalList.size(), 2));
+            }
 
             if (test.size() <= 1)
                 return;
+
             // remove end
             // we recurse on the same subset in testSubList, so just 
corroborate we have the correct list here
-            assertSubList(test.filter(r -> r != 
canonicalList.get(canonicalList.size() - 1)), 0, canonicalList.size() - 1);
+            {
+                int last = canonicalList.size() - 1;
+                Predicate<Replica> removeLast = r -> r != 
canonicalList.get(last);
+                assertSubList(test.filter(removeLast), 0, last);
+            }
 
             if (test.size() <= 2)
                 return;
+
             Predicate<Replica> removeMiddle = r -> r != 
canonicalList.get(canonicalList.size() / 2);
-            TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), 
ImmutableList.copyOf(Iterables.filter(canonicalList, removeMiddle::test)));
-            filtered.testAll(subListDepth, filterDepth - 1, sortDepth, 
selectDepth);
+            TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), 
ImmutableList.copyOf(filter(canonicalList, removeMiddle::test)));
+            filtered.testAll(subListDepth, filterDepth - 1, sortDepth);
+        }
+
+        void testCount()
+        {
+            Assert.assertEquals(0, test.count(Predicates.alwaysFalse()));
+
+            if (test.isEmpty())
+            {
+                Assert.assertEquals(0, test.count(Predicates.alwaysTrue()));
+                return;
+            }
+
+            for (int i = 0 ; i < canonicalList.size() ; ++i)
+            {
+                Replica discount = canonicalList.get(i);
+                Assert.assertEquals(canonicalList.size() - 1, test.count(r -> 
r != discount));
+            }
         }
 
         void testContains()
@@ -235,7 +241,7 @@ public class ReplicaCollectionTest
                 Assert.assertEquals(canonicalList.get(i), test.get(i));
         }
 
-        void testSort(int subListDepth, int filterDepth, int sortDepth, int 
selectDepth)
+        void testSort(int subListDepth, int filterDepth, int sortDepth)
         {
             final Comparator<Replica> comparator = (o1, o2) ->
             {
@@ -244,10 +250,10 @@ public class ReplicaCollectionTest
                 return f1 == f2 ? 0 : f1 ? 1 : -1;
             };
             TestCase<C> sorted = new TestCase<>(test.sorted(comparator), 
ImmutableList.sortedCopyOf(comparator, canonicalList));
-            sorted.testAll(subListDepth, filterDepth, sortDepth - 1, 
selectDepth);
+            sorted.testAll(subListDepth, filterDepth, sortDepth - 1);
         }
 
-        private void testAll(int subListDepth, int filterDepth, int sortDepth, 
int selectDepth)
+        private void testAll(int subListDepth, int filterDepth, int sortDepth)
         {
             testEndpoints();
             testOrderOfIteration();
@@ -255,19 +261,18 @@ public class ReplicaCollectionTest
             testGet();
             testEquals();
             testSize();
+            testCount();
             if (subListDepth > 0)
-                testSubList(subListDepth, filterDepth, sortDepth, selectDepth);
+                testSubList(subListDepth, filterDepth, sortDepth);
             if (filterDepth > 0)
-                testFilter(subListDepth, filterDepth, sortDepth, selectDepth);
+                testFilter(subListDepth, filterDepth, sortDepth);
             if (sortDepth > 0)
-                testSort(subListDepth, filterDepth, sortDepth, selectDepth);
-            if (selectDepth > 0)
-                testSelect(subListDepth, filterDepth, sortDepth, selectDepth);
+                testSort(subListDepth, filterDepth, sortDepth);
         }
 
         public void testAll()
         {
-            testAll(2, 2, 2, 2);
+            testAll(2, 2, 2);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to