http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java 
b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
new file mode 100644
index 0000000..25f42c3
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
+import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
+import org.apache.cassandra.utils.FBUtilities;
+
+import java.util.Collection;
+import java.util.function.Predicate;
+
+import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.limit;
+
+public class ReplicaPlans
+{
+
+    /**
+     * Construct a ReplicaPlan for writing to exactly one node, with CL.ONE. 
This node is *assumed* to be alive.
+     */
+    public static ReplicaPlan.ForTokenWrite forSingleReplicaWrite(Keyspace 
keyspace, Token token, Replica replica)
+    {
+        EndpointsForToken one = EndpointsForToken.of(token, replica);
+        EndpointsForToken empty = EndpointsForToken.empty(token);
+        return new ReplicaPlan.ForTokenWrite(keyspace, ConsistencyLevel.ONE, 
empty, one, one, one);
+    }
+
+    /**
+     * A forwarding counter write is always sent to a single owning 
coordinator for the range, by the original coordinator
+     * (if it is not itself an owner)
+     */
+    public static ReplicaPlan.ForTokenWrite forForwardingCounterWrite(Keyspace 
keyspace, Token token, Replica replica)
+    {
+        return forSingleReplicaWrite(keyspace, token, replica);
+    }
+
+    /**
+     * Requires that the provided endpoints are alive.  Converts them to their 
relevant system replicas.
+     * Note that the liveAndDown collection and live are equal to the provided 
endpoints.
+     *
+     * The semantics are a bit weird, in that CL=ONE iff we have one node 
provided, and otherwise is equal to TWO.
+     * How these CL were chosen, and why we drop the CL if only one live node 
is available, are both unclear.
+     */
+    public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace 
keyspace, Collection<InetAddressAndPort> endpoints) throws UnavailableException
+    {
+        // A single case we write not for range or token, but multiple 
mutations to many tokens
+        Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
+
+        ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
+                SystemReplicas.getSystemReplicas(endpoints).forToken(token),
+                EndpointsForToken.empty(token)
+        );
+        ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? 
ConsistencyLevel.ONE : ConsistencyLevel.TWO;
+
+        // assume that we have already been given live endpoints, and skip 
applying the failure detector
+        return forWrite(keyspace, consistencyLevel, liveAndDown, liveAndDown, 
writeAll);
+    }
+
+    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, Token token, Selector selector) throws 
UnavailableException
+    {
+        return forWrite(keyspace, consistencyLevel, 
ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token), selector);
+    }
+
+    @VisibleForTesting
+    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken 
pending, Predicate<Replica> isAlive, Selector selector) throws 
UnavailableException
+    {
+        return forWrite(keyspace, consistencyLevel, 
ReplicaLayout.forTokenWrite(natural, pending), isAlive, selector);
+    }
+
+    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, 
Selector selector) throws UnavailableException
+    {
+        return forWrite(keyspace, consistencyLevel, liveAndDown, 
FailureDetector.isReplicaAlive, selector);
+    }
+
+    @VisibleForTesting
+    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, 
Predicate<Replica> isAlive, Selector selector) throws UnavailableException
+    {
+        ReplicaLayout.ForTokenWrite live = liveAndDown.filter(isAlive);
+        return forWrite(keyspace, consistencyLevel, liveAndDown, live, 
selector);
+    }
+
+    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, 
ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException
+    {
+        EndpointsForToken contacts = selector.select(keyspace, 
consistencyLevel, liveAndDown, live);
+        ReplicaPlan.ForTokenWrite result = new 
ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(), 
liveAndDown.all(), live.all(), contacts);
+        result.assureSufficientReplicas();
+        return result;
+    }
+
+    public interface Selector
+    {
+        <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L 
liveAndDown, L live);
+    }
+
+    /**
+     * Select all nodes, transient or otherwise, as targets for the operation.
+     *
+     * This is may no longer be useful until we finish implementing transient 
replication support, however
+     * it can be of value to stipulate that a location writes to all nodes 
without regard to transient status.
+     */
+    public static final Selector writeAll = new Selector()
+    {
+        @Override
+        public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L 
liveAndDown, L live)
+        {
+            return liveAndDown.all();
+        }
+    };
+
+    /**
+     * Select all full nodes, live or down, as write targets.  If there are 
insufficient nodes to complete the write,
+     * but there are live transient nodes, select a sufficient number of these 
to reach our consistency level.
+     *
+     * Pending nodes are always contacted, whether or not they are full.  When 
a transient replica is undergoing
+     * a pending move to a new node, if we write (transiently) to it, this 
write would not be replicated to the
+     * pending transient node, and so when completing the move, the write 
could effectively have not reached the
+     * promised consistency level.
+     */
+    public static final Selector writeNormal = new Selector()
+    {
+        @Override
+        public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L 
liveAndDown, L live)
+        {
+            if (!any(liveAndDown.all(), Replica::isTransient))
+                return liveAndDown.all();
+
+            assert consistencyLevel != ConsistencyLevel.EACH_QUORUM;
+
+            ReplicaCollection.Mutable<E> contacts = 
liveAndDown.all().newMutable(liveAndDown.all().size());
+            contacts.addAll(filter(liveAndDown.natural(), Replica::isFull));
+            contacts.addAll(liveAndDown.pending());
+
+            // TODO: this doesn't correctly handle LOCAL_QUORUM (or 
EACH_QUORUM at all)
+            int liveCount = contacts.count(live.all()::contains);
+            int requiredTransientCount = 
consistencyLevel.blockForWrite(keyspace, liveAndDown.pending()) - liveCount;
+            if (requiredTransientCount > 0)
+                contacts.addAll(limit(filter(live.natural(), 
Replica::isTransient), requiredTransientCount));
+            return contacts.asSnapshot();
+        }
+    };
+
+    /**
+     * Construct the plan for a paxos round - NOT the write or read 
consistency level for either the write or comparison,
+     * but for the paxos linearisation agreement.
+     *
+     * This will select all live nodes as the candidates for the operation.  
Only the required number of participants
+     */
+    public static ReplicaPlan.ForPaxosWrite forPaxos(Keyspace keyspace, 
DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws 
UnavailableException
+    {
+        Token tk = key.getToken();
+        ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWriteLiveAndDown(keyspace, tk);
+
+        Replicas.temporaryAssertFull(liveAndDown.all()); // TODO 
CASSANDRA-14547
+
+        if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
+        {
+            // TODO: we should cleanup our semantics here, as we're filtering 
ALL nodes to localDC which is unexpected for ReplicaPlan
+            // Restrict natural and pending to node in the local DC only
+            String localDc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+            Predicate<Replica> isLocalDc = replica -> 
localDc.equals(snitch.getDatacenter(replica));
+
+            liveAndDown = liveAndDown.filter(isLocalDc);
+        }
+
+        ReplicaLayout.ForTokenWrite live = 
liveAndDown.filter(FailureDetector.isReplicaAlive);
+
+        // TODO: this should use assureSufficientReplicas
+        int participants = liveAndDown.all().size();
+        int requiredParticipants = participants / 2 + 1; // See 
CASSANDRA-8346, CASSANDRA-833
+
+        EndpointsForToken contacts = live.all();
+        if (contacts.size() < requiredParticipants)
+            throw UnavailableException.create(consistencyForPaxos, 
requiredParticipants, contacts.size());
+
+        // We cannot allow CAS operations with 2 or more pending endpoints, 
see #8346.
+        // Note that we fake an impossible number of required nodes in the 
unavailable exception
+        // to nail home the point that it's an impossible operation no matter 
how many nodes are live.
+        if (liveAndDown.pending().size() > 1)
+            throw new UnavailableException(String.format("Cannot perform LWT 
operation as there is more than one (%d) pending range movement", 
liveAndDown.all().size()),
+                    consistencyForPaxos,
+                    participants + 1,
+                    contacts.size());
+
+        return new ReplicaPlan.ForPaxosWrite(keyspace, consistencyForPaxos, 
liveAndDown.pending(), liveAndDown.all(), live.all(), contacts, 
requiredParticipants);
+    }
+
+    /**
+     * Construct a plan for reading from a single node - this permits no 
speculation or read-repair
+     */
+    public static ReplicaPlan.ForTokenRead forSingleReplicaRead(Keyspace 
keyspace, Token token, Replica replica)
+    {
+        EndpointsForToken one = EndpointsForToken.of(token, replica);
+        return new ReplicaPlan.ForTokenRead(keyspace, ConsistencyLevel.ONE, 
one, one);
+    }
+
+    /**
+     * Construct a plan for reading from a single node - this permits no 
speculation or read-repair
+     */
+    public static ReplicaPlan.ForRangeRead forSingleReplicaRead(Keyspace 
keyspace, AbstractBounds<PartitionPosition> range, Replica replica)
+    {
+        // TODO: this is unsafe, as one.range() may be inconsistent with our 
supplied range; should refactor Range/AbstractBounds to single class
+        EndpointsForRange one = EndpointsForRange.of(replica);
+        return new ReplicaPlan.ForRangeRead(keyspace, ConsistencyLevel.ONE, 
range, one, one);
+    }
+
+    /**
+     * Construct a plan for reading the provided token at the provided 
consistency level.  This translates to a collection of
+     *   - candidates who are: alive, replicate the token, and are sorted by 
their snitch scores
+     *   - contacts who are: the first blockFor + (retry == ALWAYS ? 1 : 0) 
candidates
+     *
+     * The candidate collection can be used for speculation, although at 
present it would break
+     * LOCAL_QUORUM and EACH_QUORUM to do so without further filtering
+     */
+    public static ReplicaPlan.ForTokenRead forRead(Keyspace keyspace, Token 
token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry)
+    {
+        ReplicaLayout.ForTokenRead candidates = 
ReplicaLayout.forTokenReadLiveSorted(keyspace, token);
+        EndpointsForToken contacts = consistencyLevel.filterForQuery(keyspace, 
candidates.natural(),
+                retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE));
+
+        ReplicaPlan.ForTokenRead result = new 
ReplicaPlan.ForTokenRead(keyspace, consistencyLevel, candidates.natural(), 
contacts);
+        result.assureSufficientReplicas(); // Throw UAE early if we don't have 
enough replicas.
+        return result;
+    }
+
+    /**
+     * Construct a plan for reading the provided range at the provided 
consistency level.  This translates to a collection of
+     *   - candidates who are: alive, replicate the range, and are sorted by 
their snitch scores
+     *   - contacts who are: the first blockFor candidates
+     *
+     * There is no speculation for range read queries at present, so we never 
'always speculate' here, and a failed response fails the query.
+     */
+    public static ReplicaPlan.ForRangeRead forRangeRead(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range)
+    {
+        ReplicaLayout.ForRangeRead candidates = 
ReplicaLayout.forRangeReadLiveSorted(keyspace, range);
+        EndpointsForRange contacts = consistencyLevel.filterForQuery(keyspace, 
candidates.natural());
+
+        ReplicaPlan.ForRangeRead result = new 
ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, range, 
candidates.natural(), contacts);
+        result.assureSufficientReplicas();
+        return result;
+    }
+
+    /**
+     * Take two range read plans for adjacent ranges, and check if it is OK 
(and worthwhile) to combine them into a single plan
+     */
+    public static ReplicaPlan.ForRangeRead maybeMerge(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, ReplicaPlan.ForRangeRead left, 
ReplicaPlan.ForRangeRead right)
+    {
+        // TODO: should we be asserting that the ranges are adjacent?
+        AbstractBounds<PartitionPosition> newRange = 
left.range().withNewRight(right.range().right);
+        EndpointsForRange mergedCandidates = 
left.candidates().keep(right.candidates().endpoints());
+
+        // Check if there are enough shared endpoints for the merge to be 
possible.
+        if (!consistencyLevel.isSufficientLiveReplicasForRead(keyspace, 
mergedCandidates))
+            return null;
+
+        EndpointsForRange contacts = consistencyLevel.filterForQuery(keyspace, 
mergedCandidates);
+
+        // Estimate whether merging will be a win or not
+        if 
(!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(contacts, 
left.contacts(), right.contacts()))
+            return null;
+
+        // If we get there, merge this range and the next one
+        return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, 
newRange, mergedCandidates, contacts);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/SystemReplicas.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SystemReplicas.java 
b/src/java/org/apache/cassandra/locator/SystemReplicas.java
index 13a9d74..0d1fc8d 100644
--- a/src/java/org/apache/cassandra/locator/SystemReplicas.java
+++ b/src/java/org/apache/cassandra/locator/SystemReplicas.java
@@ -18,12 +18,11 @@
 
 package org.apache.cassandra.locator;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.collect.Collections2;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -50,13 +49,8 @@ public class SystemReplicas
         return systemReplicas.computeIfAbsent(endpoint, 
SystemReplicas::createSystemReplica);
     }
 
-    public static Collection<Replica> 
getSystemReplicas(Collection<InetAddressAndPort> endpoints)
+    public static EndpointsForRange 
getSystemReplicas(Collection<InetAddressAndPort> endpoints)
     {
-        List<Replica> replicas = new ArrayList<>(endpoints.size());
-        for (InetAddressAndPort endpoint: endpoints)
-        {
-            replicas.add(getSystemReplica(endpoint));
-        }
-        return replicas;
+        return EndpointsForRange.copyOf(Collections2.transform(endpoints, 
SystemReplicas::getSystemReplica));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java 
b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 4ab34db..ad40d7b 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -1224,15 +1224,11 @@ public class TokenMetadata
     /**
      * @deprecated retained for benefit of old tests
      */
+    @Deprecated
     public EndpointsForToken getWriteEndpoints(Token token, String 
keyspaceName, EndpointsForToken natural)
     {
         EndpointsForToken pending = pendingEndpointsForToken(token, 
keyspaceName);
-        if (Endpoints.haveConflicts(natural, pending))
-        {
-            natural = Endpoints.resolveConflictsInNatural(natural, pending);
-            pending = Endpoints.resolveConflictsInPending(natural, pending);
-        }
-        return Endpoints.concat(natural, pending);
+        return ReplicaLayout.forTokenWrite(natural, pending).all();
     }
 
     /** @return an endpoint to token multimap representation of 
tokenToEndpointMap (a copy) */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/net/IAsyncCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java 
b/src/java/org/apache/cassandra/net/IAsyncCallback.java
index 253b412..ceaf072 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallback.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java
@@ -17,12 +17,6 @@
  */
 package org.apache.cassandra.net;
 
-import com.google.common.base.Predicate;
-
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.Replica;
-
 /**
  * implementors of IAsyncCallback need to make sure that any public methods
  * are threadsafe with respect to response() being called from the message
@@ -31,10 +25,6 @@ import org.apache.cassandra.locator.Replica;
  */
 public interface IAsyncCallback<T>
 {
-    final Predicate<InetAddressAndPort> isAlive = 
FailureDetector.instance::isAlive;
-
-    final Predicate<Replica> isReplicaAlive = replica -> 
isAlive.apply(replica.endpoint());
-
     /**
      * @param msg response received.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index e817cc8..7f51ae7 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -26,8 +26,9 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.stream.Collectors;
 
 import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.locator.ReplicaLayout;
 
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +37,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.exceptions.WriteFailureException;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -53,7 +53,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     //Count down until all responses and expirations have occured before 
deciding whether the ideal CL was reached.
     private AtomicInteger responsesAndExpirations;
     private final SimpleCondition condition = new SimpleCondition();
-    protected final ReplicaLayout.ForToken replicaLayout;
+    protected final ReplicaPlan.ForTokenWrite replicaPlan;
 
     protected final Runnable callback;
     protected final WriteType writeType;
@@ -76,12 +76,12 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
      * @param callback           A callback to be called when the write is 
successful.
      * @param queryStartNanoTime
      */
-    protected AbstractWriteResponseHandler(ReplicaLayout.ForToken 
replicaLayout,
+    protected AbstractWriteResponseHandler(ReplicaPlan.ForTokenWrite 
replicaPlan,
                                            Runnable callback,
                                            WriteType writeType,
                                            long queryStartNanoTime)
     {
-        this.replicaLayout = replicaLayout;
+        this.replicaPlan = replicaPlan;
         this.callback = callback;
         this.writeType = writeType;
         this.failureReasonByEndpoint = new ConcurrentHashMap<>();
@@ -104,19 +104,19 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
 
         if (!success)
         {
-            int blockedFor = totalBlockFor();
+            int blockedFor = blockFor();
             int acks = ackCount();
             // It's pretty unlikely, but we can race between exiting await 
above and here, so
             // that we could now have enough acks. In that case, we "lie" on 
the acks count to
             // avoid sending confusing info to the user (see CASSANDRA-6491).
             if (acks >= blockedFor)
                 acks = blockedFor - 1;
-            throw new WriteTimeoutException(writeType, 
replicaLayout.consistencyLevel(), acks, blockedFor);
+            throw new WriteTimeoutException(writeType, 
replicaPlan.consistencyLevel(), acks, blockedFor);
         }
 
-        if (totalBlockFor() + failures > totalEndpoints())
+        if (blockFor() + failures > candidateReplicaCount())
         {
-            throw new WriteFailureException(replicaLayout.consistencyLevel(), 
ackCount(), totalBlockFor(), writeType, failureReasonByEndpoint);
+            throw new WriteFailureException(replicaPlan.consistencyLevel(), 
ackCount(), blockFor(), writeType, failureReasonByEndpoint);
         }
     }
 
@@ -135,7 +135,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     public void setIdealCLResponseHandler(AbstractWriteResponseHandler handler)
     {
         this.idealCLDelegate = handler;
-        idealCLDelegate.responsesAndExpirations = new 
AtomicInteger(replicaLayout.selected().size());
+        idealCLDelegate.responsesAndExpirations = new 
AtomicInteger(replicaPlan.contacts().size());
     }
 
     /**
@@ -189,28 +189,30 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     /**
      * @return the minimum number of endpoints that must reply.
      */
-    protected int totalBlockFor()
+    protected int blockFor()
     {
         // During bootstrap, we have to include the pending endpoints or we 
may fail the consistency level
         // guarantees (see #833)
-        return 
replicaLayout.consistencyLevel().blockForWrite(replicaLayout.keyspace(), 
replicaLayout.pending());
+        return replicaPlan.blockFor();
     }
 
     /**
+     * TODO: this method is brittle for its purpose of deciding when we should 
fail a query;
+     *       this needs to be CL aware, and of which nodes are live/down
      * @return the total number of endpoints the request can been sent to.
      */
-    protected int totalEndpoints()
+    protected int candidateReplicaCount()
     {
-        return replicaLayout.all().size();
+        return replicaPlan.liveAndDown().size();
     }
 
     public ConsistencyLevel consistencyLevel()
     {
-        return replicaLayout.consistencyLevel();
+        return replicaPlan.consistencyLevel();
     }
 
     /**
-     * @return true if the message counts towards the totalBlockFor() threshold
+     * @return true if the message counts towards the blockFor() threshold
      */
     protected boolean waitingFor(InetAddressAndPort from)
     {
@@ -227,11 +229,6 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
      */
     public abstract void response(MessageIn<T> msg);
 
-    public void assureSufficientLiveNodes() throws UnavailableException
-    {
-        
replicaLayout.consistencyLevel().assureSufficientLiveNodesForWrite(replicaLayout.keyspace(),
 replicaLayout.all().filter(isReplicaAlive), replicaLayout.pending());
-    }
-
     protected void signal()
     {
         condition.signalAll();
@@ -250,7 +247,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
 
         failureReasonByEndpoint.put(from, failureReason);
 
-        if (totalBlockFor() + n > totalEndpoints())
+        if (blockFor() + n > candidateReplicaCount())
             signal();
     }
 
@@ -278,11 +275,11 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
             //The condition being signaled is a valid proxy for the CL being 
achieved
             if (!condition.isSignaled())
             {
-                replicaLayout.keyspace().metric.writeFailedIdealCL.inc();
+                replicaPlan.keyspace().metric.writeFailedIdealCL.inc();
             }
             else
             {
-                
replicaLayout.keyspace().metric.idealCLWriteLatency.addNano(System.nanoTime() - 
queryStartNanoTime);
+                
replicaPlan.keyspace().metric.idealCLWriteLatency.addNano(System.nanoTime() - 
queryStartNanoTime);
             }
         }
     }
@@ -292,7 +289,8 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
      */
     public void maybeTryAdditionalReplicas(IMutation mutation, 
StorageProxy.WritePerformer writePerformer, String localDC)
     {
-        if (replicaLayout.all().size() == replicaLayout.selected().size())
+        EndpointsForToken uncontacted = replicaPlan.liveUncontacted();
+        if (uncontacted.isEmpty())
             return;
 
         long timeout = Long.MAX_VALUE;
@@ -313,7 +311,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
                 for (ColumnFamilyStore cf : cfs)
                     cf.metric.speculativeWrites.inc();
 
-                writePerformer.apply(mutation, 
replicaLayout.forNaturalUncontacted(),
+                writePerformer.apply(mutation, 
replicaPlan.withContact(uncontacted),
                                      (AbstractWriteResponseHandler<IMutation>) 
this,
                                      localDC);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 8ffca6a..b32f67e 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -331,7 +331,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
             TokenMetadata.Topology topology = 
ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();
             Multimap<String, InetAddressAndPort> dcEndpointsMap = 
topology.getDatacenterEndpoints();
             Iterable<InetAddressAndPort> dcEndpoints = 
concat(transform(dataCenters, dcEndpointsMap::get));
-            return neighbors.keep(dcEndpoints);
+            return neighbors.select(dcEndpoints, true);
         }
         else if (hosts != null && !hosts.isEmpty())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java 
b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
index ee74df5..63fbc72 100644
--- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
@@ -36,7 +36,7 @@ public class BatchlogResponseHandler<T> extends 
AbstractWriteResponseHandler<T>
 
     public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, 
int requiredBeforeFinish, BatchlogCleanup cleanup, long queryStartNanoTime)
     {
-        super(wrapped.replicaLayout, wrapped.callback, wrapped.writeType, 
queryStartNanoTime);
+        super(wrapped.replicaPlan, wrapped.callback, wrapped.writeType, 
queryStartNanoTime);
         this.wrapped = wrapped;
         this.requiredBeforeFinish = requiredBeforeFinish;
         this.cleanup = cleanup;
@@ -64,24 +64,19 @@ public class BatchlogResponseHandler<T> extends 
AbstractWriteResponseHandler<T>
         wrapped.onFailure(from, failureReason);
     }
 
-    public void assureSufficientLiveNodes()
-    {
-        wrapped.assureSufficientLiveNodes();
-    }
-
     public void get() throws WriteTimeoutException, WriteFailureException
     {
         wrapped.get();
     }
 
-    protected int totalBlockFor()
+    protected int blockFor()
     {
-        return wrapped.totalBlockFor();
+        return wrapped.blockFor();
     }
 
-    protected int totalEndpoints()
+    protected int candidateReplicaCount()
     {
-        return wrapped.totalEndpoints();
+        return wrapped.candidateReplicaCount();
     }
 
     protected boolean waitingFor(InetAddressAndPort from)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index d4cdcc6..4c892ff 100644
--- 
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ 
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -22,10 +22,10 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.WriteType;
@@ -40,16 +40,16 @@ public class DatacenterSyncWriteResponseHandler<T> extends 
AbstractWriteResponse
     private final Map<String, AtomicInteger> responses = new HashMap<String, 
AtomicInteger>();
     private final AtomicInteger acks = new AtomicInteger(0);
 
-    public DatacenterSyncWriteResponseHandler(ReplicaLayout.ForToken 
replicaLayout,
+    public DatacenterSyncWriteResponseHandler(ReplicaPlan.ForTokenWrite 
replicaPlan,
                                               Runnable callback,
                                               WriteType writeType,
                                               long queryStartNanoTime)
     {
         // Response is been managed by the map so make it 1 for the superclass.
-        super(replicaLayout, callback, writeType, queryStartNanoTime);
-        assert replicaLayout.consistencyLevel() == 
ConsistencyLevel.EACH_QUORUM;
+        super(replicaPlan, callback, writeType, queryStartNanoTime);
+        assert replicaPlan.consistencyLevel() == ConsistencyLevel.EACH_QUORUM;
 
-        NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) 
replicaLayout.keyspace().getReplicationStrategy();
+        NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) 
replicaPlan.keyspace().getReplicationStrategy();
 
         for (String dc : strategy.getDatacenters())
         {
@@ -59,7 +59,7 @@ public class DatacenterSyncWriteResponseHandler<T> extends 
AbstractWriteResponse
 
         // During bootstrap, we have to include the pending endpoints or we 
may fail the consistency level
         // guarantees (see #833)
-        for (Replica pending : replicaLayout.pending())
+        for (Replica pending : replicaPlan.pending())
         {
             responses.get(snitch.getDatacenter(pending)).incrementAndGet();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index b458a71..a3ef76f 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.service;
 
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.MessageIn;
 
 /**
@@ -27,13 +27,13 @@ import org.apache.cassandra.net.MessageIn;
  */
 public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T>
 {
-    public DatacenterWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+    public DatacenterWriteResponseHandler(ReplicaPlan.ForTokenWrite 
replicaPlan,
                                           Runnable callback,
                                           WriteType writeType,
                                           long queryStartNanoTime)
     {
-        super(replicaLayout, callback, writeType, queryStartNanoTime);
-        assert replicaLayout.consistencyLevel().isDatacenterLocal();
+        super(replicaPlan, callback, writeType, queryStartNanoTime);
+        assert replicaPlan.consistencyLevel().isDatacenterLocal();
     }
 
     @Override
@@ -54,6 +54,6 @@ public class DatacenterWriteResponseHandler<T> extends 
WriteResponseHandler<T>
     @Override
     protected boolean waitingFor(InetAddressAndPort from)
     {
-        return replicaLayout.consistencyLevel().isLocal(from);
+        return replicaPlan.consistencyLevel().isLocal(from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5eb43cf..fc49330 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -60,7 +60,6 @@ import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.ViewUtils;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.hints.Hint;
 import org.apache.cassandra.hints.HintsService;
@@ -135,7 +134,7 @@ public class StorageProxy implements StorageProxyMBean
         standardWritePerformer = (mutation, targets, responseHandler, 
localDataCenter) ->
         {
             assert mutation instanceof Mutation;
-            sendToHintedReplicas((Mutation) mutation, targets.selected(), 
responseHandler, localDataCenter, Stage.MUTATION);
+            sendToHintedReplicas((Mutation) mutation, targets, 
responseHandler, localDataCenter, Stage.MUTATION);
         };
 
         /*
@@ -146,17 +145,17 @@ public class StorageProxy implements StorageProxyMBean
          */
         counterWritePerformer = (mutation, targets, responseHandler, 
localDataCenter) ->
         {
-            EndpointsForToken selected = targets.selected().withoutSelf();
+            EndpointsForToken selected = targets.contacts().withoutSelf();
             Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548
-            counterWriteTask(mutation, selected, responseHandler, 
localDataCenter).run();
+            counterWriteTask(mutation, targets.withContact(selected), 
responseHandler, localDataCenter).run();
         };
 
         counterWriteOnCoordinatorPerformer = (mutation, targets, 
responseHandler, localDataCenter) ->
         {
-            EndpointsForToken selected = targets.selected().withoutSelf();
+            EndpointsForToken selected = targets.contacts().withoutSelf();
             Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548
             StageManager.getStage(Stage.COUNTER_MUTATION)
-                        .execute(counterWriteTask(mutation, selected, 
responseHandler, localDataCenter));
+                        .execute(counterWriteTask(mutation, 
targets.withContact(selected), responseHandler, localDataCenter));
         };
 
         for(ConsistencyLevel level : ConsistencyLevel.values())
@@ -232,9 +231,9 @@ public class StorageProxy implements StorageProxyMBean
             while (System.nanoTime() - queryStartNanoTime < timeout)
             {
                 // for simplicity, we'll do a single liveness check at the 
start of each attempt
-                ReplicaLayout.ForPaxos replicaLayout = 
ReplicaLayout.forPaxos(Keyspace.open(keyspaceName), key, consistencyForPaxos);
+                ReplicaPlan.ForPaxosWrite replicaPlan = 
ReplicaPlans.forPaxos(Keyspace.open(keyspaceName), key, consistencyForPaxos);
 
-                final PaxosBallotAndContention pair = 
beginAndRepairPaxos(queryStartNanoTime, key, metadata, replicaLayout, 
consistencyForPaxos, consistencyForCommit, true, state);
+                final PaxosBallotAndContention pair = 
beginAndRepairPaxos(queryStartNanoTime, key, metadata, replicaPlan, 
consistencyForPaxos, consistencyForCommit, true, state);
                 final UUID ballot = pair.ballot;
                 contentions += pair.contentions;
 
@@ -276,7 +275,7 @@ public class StorageProxy implements StorageProxyMBean
 
                 Commit proposal = Commit.newProposal(ballot, updates);
                 Tracing.trace("CAS precondition is met; proposing 
client-requested updates for {}", ballot);
-                if (proposePaxos(proposal, replicaLayout, true, 
queryStartNanoTime))
+                if (proposePaxos(proposal, replicaPlan, true, 
queryStartNanoTime))
                 {
                     commitPaxos(proposal, consistencyForCommit, true, 
queryStartNanoTime);
                     Tracing.trace("CAS successful");
@@ -334,7 +333,7 @@ public class StorageProxy implements StorageProxyMBean
     private static PaxosBallotAndContention beginAndRepairPaxos(long 
queryStartNanoTime,
                                                                 DecoratedKey 
key,
                                                                 TableMetadata 
metadata,
-                                                                
ReplicaLayout.ForPaxos replicaLayout,
+                                                                
ReplicaPlan.ForPaxosWrite paxosPlan,
                                                                 
ConsistencyLevel consistencyForPaxos,
                                                                 
ConsistencyLevel consistencyForCommit,
                                                                 final boolean 
isWrite,
@@ -360,7 +359,7 @@ public class StorageProxy implements StorageProxyMBean
             // prepare
             Tracing.trace("Preparing {}", ballot);
             Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
-            summary = preparePaxos(toPrepare, replicaLayout, 
queryStartNanoTime);
+            summary = preparePaxos(toPrepare, paxosPlan, queryStartNanoTime);
             if (!summary.promised)
             {
                 Tracing.trace("Some replicas have already promised a higher 
ballot than ours; aborting");
@@ -383,7 +382,7 @@ public class StorageProxy implements StorageProxyMBean
                 else
                     casReadMetrics.unfinishedCommit.inc();
                 Commit refreshedInProgress = Commit.newProposal(ballot, 
inProgress.update);
-                if (proposePaxos(refreshedInProgress, replicaLayout, false, 
queryStartNanoTime))
+                if (proposePaxos(refreshedInProgress, paxosPlan, false, 
queryStartNanoTime))
                 {
                     try
                     {
@@ -440,12 +439,12 @@ public class StorageProxy implements StorageProxyMBean
             MessagingService.instance().sendOneWay(message, target);
     }
 
-    private static PrepareCallback preparePaxos(Commit toPrepare, 
ReplicaLayout.ForPaxos replicaLayout, long queryStartNanoTime)
+    private static PrepareCallback preparePaxos(Commit toPrepare, 
ReplicaPlan.ForPaxosWrite replicaPlan, long queryStartNanoTime)
     throws WriteTimeoutException
     {
-        PrepareCallback callback = new 
PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), 
replicaLayout.getRequiredParticipants(), replicaLayout.consistencyLevel(), 
queryStartNanoTime);
+        PrepareCallback callback = new 
PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), 
replicaPlan.requiredParticipants(), replicaPlan.consistencyLevel(), 
queryStartNanoTime);
         MessageOut<Commit> message = new 
MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, 
Commit.serializer);
-        for (Replica replica: replicaLayout.selected())
+        for (Replica replica: replicaPlan.contacts())
         {
             if (replica.isLocal())
             {
@@ -478,12 +477,12 @@ public class StorageProxy implements StorageProxyMBean
         return callback;
     }
 
-    private static boolean proposePaxos(Commit proposal, 
ReplicaLayout.ForPaxos replicaLayout, boolean timeoutIfPartial, long 
queryStartNanoTime)
+    private static boolean proposePaxos(Commit proposal, 
ReplicaPlan.ForPaxosWrite replicaPlan, boolean timeoutIfPartial, long 
queryStartNanoTime)
     throws WriteTimeoutException
     {
-        ProposeCallback callback = new 
ProposeCallback(replicaLayout.selected().size(), 
replicaLayout.getRequiredParticipants(), !timeoutIfPartial, 
replicaLayout.consistencyLevel(), queryStartNanoTime);
+        ProposeCallback callback = new 
ProposeCallback(replicaPlan.contacts().size(), 
replicaPlan.requiredParticipants(), !timeoutIfPartial, 
replicaPlan.consistencyLevel(), queryStartNanoTime);
         MessageOut<Commit> message = new 
MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, 
Commit.serializer);
-        for (Replica replica : replicaLayout.selected())
+        for (Replica replica : replicaPlan.contacts())
         {
             if (replica.isLocal())
             {
@@ -518,7 +517,7 @@ public class StorageProxy implements StorageProxyMBean
             return true;
 
         if (timeoutIfPartial && !callback.isFullyRefused())
-            throw new WriteTimeoutException(WriteType.CAS, 
replicaLayout.consistencyLevel(), callback.getAcceptCount(), 
replicaLayout.getRequiredParticipants());
+            throw new WriteTimeoutException(WriteType.CAS, 
replicaPlan.consistencyLevel(), callback.getAcceptCount(), 
replicaPlan.requiredParticipants());
 
         return false;
     }
@@ -531,21 +530,22 @@ public class StorageProxy implements StorageProxyMBean
         Token tk = proposal.update.partitionKey().getToken();
 
         AbstractWriteResponseHandler<Commit> responseHandler = null;
-        ReplicaLayout.ForToken replicaLayout = 
ReplicaLayout.forWrite(keyspace, consistencyLevel, tk, 
FailureDetector.instance::isAlive);
+        // NOTE: this ReplicaPlan is a lie, this usage of ReplicaPlan could do 
with being clarified - the selected() collection is essentially (I think) never 
used
+        ReplicaPlan.ForTokenWrite replicaPlan = 
ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeAll);
         if (shouldBlock)
         {
             AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-            responseHandler = rs.getWriteResponseHandler(replicaLayout, null, 
WriteType.SIMPLE, queryStartNanoTime);
+            responseHandler = rs.getWriteResponseHandler(replicaPlan, null, 
WriteType.SIMPLE, queryStartNanoTime);
             responseHandler.setSupportsBackPressure(false);
         }
 
-        MessageOut<Commit> message = new 
MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, 
Commit.serializer);
-        for (Replica replica : replicaLayout.all())
+        MessageOut<Commit> message = new 
MessageOut<>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
+        for (Replica replica : replicaPlan.liveAndDown())
         {
             InetAddressAndPort destination = replica.endpoint();
             checkHintOverload(replica);
 
-            if (FailureDetector.instance.isAlive(destination))
+            if (replicaPlan.isAlive(replica))
             {
                 if (shouldBlock)
                 {
@@ -616,10 +616,10 @@ public class StorageProxy implements StorageProxyMBean
      * the data across to some other replica.
      *
      * @param mutations the mutations to be applied across the replicas
-     * @param consistency_level the consistency level for the operation
+     * @param consistencyLevel the consistency level for the operation
      * @param queryStartNanoTime the value of System.nanoTime() when the query 
started to be processed
      */
-    public static void mutate(List<? extends IMutation> mutations, 
ConsistencyLevel consistency_level, long queryStartNanoTime)
+    public static void mutate(List<? extends IMutation> mutations, 
ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws UnavailableException, OverloadedException, WriteTimeoutException, 
WriteFailureException
     {
         Tracing.trace("Determining replicas for mutation");
@@ -637,7 +637,7 @@ public class StorageProxy implements StorageProxyMBean
                 if (mutation instanceof CounterMutation)
                     
responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter, 
queryStartNanoTime));
                 else
-                    responseHandlers.add(performWrite(mutation, 
consistency_level, localDataCenter, standardWritePerformer, null, 
plainWriteType, queryStartNanoTime));
+                    responseHandlers.add(performWrite(mutation, 
consistencyLevel, localDataCenter, standardWritePerformer, null, 
plainWriteType, queryStartNanoTime));
             }
 
             // upgrade to full quorum any failed cheap quorums
@@ -653,7 +653,7 @@ public class StorageProxy implements StorageProxyMBean
         }
         catch (WriteTimeoutException|WriteFailureException ex)
         {
-            if (consistency_level == ConsistencyLevel.ANY)
+            if (consistencyLevel == ConsistencyLevel.ANY)
             {
                 hintMutations(mutations);
             }
@@ -662,7 +662,7 @@ public class StorageProxy implements StorageProxyMBean
                 if (ex instanceof WriteFailureException)
                 {
                     writeMetrics.failures.mark();
-                    writeMetricsMap.get(consistency_level).failures.mark();
+                    writeMetricsMap.get(consistencyLevel).failures.mark();
                     WriteFailureException fe = (WriteFailureException)ex;
                     Tracing.trace("Write failure; received {} of {} required 
replies, failed {} requests",
                                   fe.received, fe.blockFor, 
fe.failureReasonByEndpoint.size());
@@ -670,7 +670,7 @@ public class StorageProxy implements StorageProxyMBean
                 else
                 {
                     writeMetrics.timeouts.mark();
-                    writeMetricsMap.get(consistency_level).timeouts.mark();
+                    writeMetricsMap.get(consistencyLevel).timeouts.mark();
                     WriteTimeoutException te = (WriteTimeoutException)ex;
                     Tracing.trace("Write timeout; received {} of {} required 
replies", te.received, te.blockFor);
                 }
@@ -680,14 +680,14 @@ public class StorageProxy implements StorageProxyMBean
         catch (UnavailableException e)
         {
             writeMetrics.unavailables.mark();
-            writeMetricsMap.get(consistency_level).unavailables.mark();
+            writeMetricsMap.get(consistencyLevel).unavailables.mark();
             Tracing.trace("Unavailable");
             throw e;
         }
         catch (OverloadedException e)
         {
             writeMetrics.unavailables.mark();
-            writeMetricsMap.get(consistency_level).unavailables.mark();
+            writeMetricsMap.get(consistencyLevel).unavailables.mark();
             Tracing.trace("Overloaded");
             throw e;
         }
@@ -695,7 +695,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             long latency = System.nanoTime() - startTime;
             writeMetrics.addNano(latency);
-            writeMetricsMap.get(consistency_level).addNano(latency);
+            writeMetricsMap.get(consistencyLevel).addNano(latency);
             updateCoordinatorWriteLatencyTableMetric(mutations, latency);
         }
     }
@@ -725,7 +725,8 @@ public class StorageProxy implements StorageProxyMBean
 
         // local writes can timeout, but cannot be dropped (see 
LocalMutationRunnable and CASSANDRA-6510),
         // so there is no need to hint or retry.
-        EndpointsForToken replicasToHint = 
StorageService.instance.getNaturalAndPendingReplicasForToken(keyspaceName, 
token)
+        EndpointsForToken replicasToHint = 
ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token)
+                .all()
                 .filter(StorageProxy::shouldHint);
 
         submitHint(mutation, replicasToHint, null);
@@ -737,8 +738,8 @@ public class StorageProxy implements StorageProxyMBean
         Token token = mutation.key().getToken();
         InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
 
-        return 
StorageService.instance.getNaturalReplicasForToken(keyspaceName, 
token).endpoints().contains(local)
-               || 
StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, 
keyspaceName).endpoints().contains(local);
+        return 
ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token)
+                .all().endpoints().contains(local);
     }
 
     /**
@@ -934,7 +935,6 @@ public class StorageProxy implements StorageProxyMBean
                                                                                
cleanup,
                                                                                
queryStartNanoTime);
                 // exit early if we can't fulfill the CL at this time.
-                wrapper.handler.assureSufficientLiveNodes();
                 wrappers.add(wrapper);
             }
 
@@ -1002,14 +1002,14 @@ public class StorageProxy implements StorageProxyMBean
     throws WriteTimeoutException, WriteFailureException
     {
         Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
-        ReplicaLayout.ForToken replicaLayout = 
ReplicaLayout.forBatchlogWrite(systemKeypsace, endpoints);
-        WriteResponseHandler<?> handler = new 
WriteResponseHandler(replicaLayout,
+        ReplicaPlan.ForTokenWrite replicaPlan = 
ReplicaPlans.forBatchlogWrite(systemKeypsace, endpoints);
+        WriteResponseHandler<?> handler = new WriteResponseHandler(replicaPlan,
                                                                    
WriteType.BATCH_LOG,
                                                                    
queryStartNanoTime);
 
         Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), 
mutations);
         MessageOut<Batch> message = new 
MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer);
-        for (Replica replica : replicaLayout.all())
+        for (Replica replica : replicaPlan.liveAndDown())
         {
             logger.trace("Sending batchlog store request {} to {} for {} 
mutations", batch.id, replica, batch.size());
 
@@ -1040,12 +1040,12 @@ public class StorageProxy implements StorageProxyMBean
     {
         for (WriteResponseHandlerWrapper wrapper : wrappers)
         {
-            Replicas.temporaryAssertFull(wrapper.handler.replicaLayout.all()); 
 // TODO: CASSANDRA-14549
-            ReplicaLayout.ForToken replicas = 
wrapper.handler.replicaLayout.withSelected(wrapper.handler.replicaLayout.all());
+            
Replicas.temporaryAssertFull(wrapper.handler.replicaPlan.liveAndDown());  // 
TODO: CASSANDRA-14549
+            ReplicaPlan.ForTokenWrite replicas = 
wrapper.handler.replicaPlan.withContact(wrapper.handler.replicaPlan.liveAndDown());
 
             try
             {
-                sendToHintedReplicas(wrapper.mutation, replicas.selected(), 
wrapper.handler, localDataCenter, stage);
+                sendToHintedReplicas(wrapper.mutation, replicas, 
wrapper.handler, localDataCenter, stage);
             }
             catch (OverloadedException | WriteTimeoutException e)
             {
@@ -1059,11 +1059,11 @@ public class StorageProxy implements StorageProxyMBean
     {
         for (WriteResponseHandlerWrapper wrapper : wrappers)
         {
-            Replicas.temporaryAssertFull(wrapper.handler.replicaLayout.all()); 
// TODO: CASSANDRA-14549
-            sendToHintedReplicas(wrapper.mutation, 
wrapper.handler.replicaLayout.all(), wrapper.handler, localDataCenter, stage);
+            EndpointsForToken sendTo = 
wrapper.handler.replicaPlan.liveAndDown();
+            Replicas.temporaryAssertFull(sendTo); // TODO: CASSANDRA-14549
+            sendToHintedReplicas(wrapper.mutation, 
wrapper.handler.replicaPlan.withContact(sendTo), wrapper.handler, 
localDataCenter, stage);
         }
 
-
         for (WriteResponseHandlerWrapper wrapper : wrappers)
             wrapper.handler.get();
     }
@@ -1096,13 +1096,10 @@ public class StorageProxy implements StorageProxyMBean
 
         Token tk = mutation.key().getToken();
 
-        ReplicaLayout.ForToken replicaLayout = 
ReplicaLayout.forWriteWithDownNodes(keyspace, consistencyLevel, tk);
-        AbstractWriteResponseHandler<IMutation> responseHandler = 
rs.getWriteResponseHandler(replicaLayout, callback, writeType, 
queryStartNanoTime);
+        ReplicaPlan.ForTokenWrite replicaPlan = 
ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal);
+        AbstractWriteResponseHandler<IMutation> responseHandler = 
rs.getWriteResponseHandler(replicaPlan, callback, writeType, 
queryStartNanoTime);
 
-        // exit early if we can't fulfill the CL at this time
-        responseHandler.assureSufficientLiveNodes();
-
-        performer.apply(mutation, replicaLayout, responseHandler, 
localDataCenter);
+        performer.apply(mutation, replicaPlan, responseHandler, 
localDataCenter);
         return responseHandler;
     }
 
@@ -1118,8 +1115,8 @@ public class StorageProxy implements StorageProxyMBean
         AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
         Token tk = mutation.key().getToken();
 
-        ReplicaLayout.ForToken replicaLayout = 
ReplicaLayout.forWrite(keyspace, consistencyLevel, tk, 
FailureDetector.instance::isAlive);
-        AbstractWriteResponseHandler<IMutation> writeHandler = 
rs.getWriteResponseHandler(replicaLayout,null, writeType, queryStartNanoTime);
+        ReplicaPlan.ForTokenWrite replicaPlan = 
ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal);
+        AbstractWriteResponseHandler<IMutation> writeHandler = 
rs.getWriteResponseHandler(replicaPlan,null, writeType, queryStartNanoTime);
         BatchlogResponseHandler<IMutation> batchHandler = new 
BatchlogResponseHandler<>(writeHandler, 
batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime);
         return new WriteResponseHandlerWrapper(batchHandler, mutation);
     }
@@ -1140,10 +1137,11 @@ public class StorageProxy implements StorageProxyMBean
     {
         Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
         AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-        Token tk = mutation.key().getToken();
-        ReplicaLayout.ForToken replicaLayout = 
ReplicaLayout.forWriteWithDownNodes(keyspace, consistencyLevel, tk, 
naturalEndpoints, pendingEndpoints);
 
-        AbstractWriteResponseHandler<IMutation> writeHandler = 
rs.getWriteResponseHandler(replicaLayout, () -> {
+        ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(naturalEndpoints, pendingEndpoints);
+        ReplicaPlan.ForTokenWrite replicaPlan = 
ReplicaPlans.forWrite(keyspace, consistencyLevel, liveAndDown, 
ReplicaPlans.writeAll);
+
+        AbstractWriteResponseHandler<IMutation> writeHandler = 
rs.getWriteResponseHandler(replicaPlan, () -> {
             long delay = Math.max(0, System.currentTimeMillis() - 
baseComplete.get());
             viewWriteMetrics.viewWriteLatency.update(delay, 
TimeUnit.MILLISECONDS);
         }, writeType, queryStartNanoTime);
@@ -1208,7 +1206,7 @@ public class StorageProxy implements StorageProxyMBean
      * @throws OverloadedException if the hints cannot be written/enqueued
      */
     public static void sendToHintedReplicas(final Mutation mutation,
-                                            EndpointsForToken targets,
+                                            ReplicaPlan.ForTokenWrite plan,
                                             
AbstractWriteResponseHandler<IMutation> responseHandler,
                                             String localDataCenter,
                                             Stage stage)
@@ -1227,11 +1225,11 @@ public class StorageProxy implements StorageProxyMBean
 
         List<InetAddressAndPort> backPressureHosts = null;
 
-        for (Replica destination : targets)
+        for (Replica destination : plan.contacts())
         {
             checkHintOverload(destination);
 
-            if (FailureDetector.instance.isAlive(destination.endpoint()))
+            if (plan.isAlive(destination))
             {
                 if (destination.isLocal())
                 {
@@ -1251,7 +1249,7 @@ public class StorageProxy implements StorageProxyMBean
                     if (localDataCenter.equals(dc))
                     {
                         if (localDc == null)
-                            localDc = new ArrayList<>(targets.size());
+                            localDc = new ArrayList<>(plan.contacts().size());
 
                         localDc.add(destination);
                     }
@@ -1268,7 +1266,7 @@ public class StorageProxy implements StorageProxyMBean
                     }
 
                     if (backPressureHosts == null)
-                        backPressureHosts = new ArrayList<>(targets.size());
+                        backPressureHosts = new 
ArrayList<>(plan.contacts().size());
 
                     backPressureHosts.add(destination.endpoint());
                 }
@@ -1345,7 +1343,7 @@ public class StorageProxy implements StorageProxyMBean
                                                                   message,
                                                                   destination,
                                                                   
message.getTimeout(),
-                                                                  
handler.replicaLayout.consistencyLevel(),
+                                                                  
handler.replicaPlan.consistencyLevel(),
                                                                   true);
             messageIds[idIdx++] = id;
             logger.trace("Adding FWD message to {}@{}", id, destination);
@@ -1435,14 +1433,13 @@ public class StorageProxy implements StorageProxyMBean
             // Exit now if we can't fulfill the CL here instead of forwarding 
to the leader replica
             String keyspaceName = cm.getKeyspaceName();
             Keyspace keyspace = Keyspace.open(keyspaceName);
-            AbstractReplicationStrategy rs = 
Keyspace.open(keyspaceName).getReplicationStrategy();
             Token tk = cm.key().getToken();
 
-            ReplicaLayout.ForToken replicaLayout = 
ReplicaLayout.forWriteWithDownNodes(keyspace, cm.consistency(), tk);
-            rs.getWriteResponseHandler(replicaLayout, null, WriteType.COUNTER, 
queryStartNanoTime).assureSufficientLiveNodes();
+            ReplicaPlans.forWrite(keyspace, cm.consistency(), tk, 
ReplicaPlans.writeAll)
+                    .assureSufficientReplicas();
 
             // Forward the actual update to the chosen leader replica
-            AbstractWriteResponseHandler<IMutation> responseHandler = new 
WriteResponseHandler<>(ReplicaLayout.forCounterWrite(keyspace, tk, replica),
+            AbstractWriteResponseHandler<IMutation> responseHandler = new 
WriteResponseHandler<>(ReplicaPlans.forForwardingCounterWrite(keyspace, tk, 
replica),
                                                                                
                  WriteType.COUNTER, queryStartNanoTime);
 
             Tracing.trace("Enqueuing counter update to {}", replica);
@@ -1465,7 +1462,7 @@ public class StorageProxy implements StorageProxyMBean
     {
         Keyspace keyspace = Keyspace.open(keyspaceName);
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-        EndpointsForToken replicas = 
StorageService.instance.getLiveNaturalReplicasForToken(keyspace, key);
+        EndpointsForToken replicas = 
keyspace.getReplicationStrategy().getNaturalReplicasForToken(key);
 
         // CASSANDRA-13043: filter out those endpoints not accepting clients 
yet, maybe because still bootstrapping
         replicas = replicas.filter(replica -> 
StorageService.instance.isRpcReady(replica.endpoint()));
@@ -1511,7 +1508,7 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     private static Runnable counterWriteTask(final IMutation mutation,
-                                             final EndpointsForToken targets,
+                                             final ReplicaPlan.ForTokenWrite 
replicaPlan,
                                              final 
AbstractWriteResponseHandler<IMutation> responseHandler,
                                              final String localDataCenter)
     {
@@ -1524,7 +1521,7 @@ public class StorageProxy implements StorageProxyMBean
 
                 Mutation result = ((CounterMutation) 
mutation).applyCounterMutation();
                 responseHandler.response(null);
-                sendToHintedReplicas(result, targets, responseHandler, 
localDataCenter, Stage.COUNTER_MUTATION);
+                sendToHintedReplicas(result, replicaPlan, responseHandler, 
localDataCenter, Stage.COUNTER_MUTATION);
             }
         };
     }
@@ -1592,7 +1589,7 @@ public class StorageProxy implements StorageProxyMBean
         try
         {
             // make sure any in-progress paxos writes are done (i.e., 
committed to a majority of replicas), before performing a quorum read
-            ReplicaLayout.ForPaxos replicaLayout = 
ReplicaLayout.forPaxos(Keyspace.open(metadata.keyspace), key, consistencyLevel);
+            ReplicaPlan.ForPaxosWrite replicaPlan = 
ReplicaPlans.forPaxos(Keyspace.open(metadata.keyspace), key, consistencyLevel);
 
             // does the work of applying in-progress writes; throws UAE or 
timeout if it can't
             final ConsistencyLevel consistencyForCommitOrFetch = 
consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
@@ -1601,7 +1598,7 @@ public class StorageProxy implements StorageProxyMBean
 
             try
             {
-                final PaxosBallotAndContention pair = 
beginAndRepairPaxos(start, key, metadata, replicaLayout, consistencyLevel, 
consistencyForCommitOrFetch, false, state);
+                final PaxosBallotAndContention pair = 
beginAndRepairPaxos(start, key, metadata, replicaPlan, consistencyLevel, 
consistencyForCommitOrFetch, false, state);
                 if (pair.contentions > 0)
                     casReadMetrics.contention.update(pair.contentions);
             }
@@ -1850,21 +1847,6 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    public static EndpointsForToken getLiveSortedReplicasForToken(Keyspace 
keyspace, RingPosition pos)
-    {
-        return getLiveSortedReplicas(keyspace, pos).forToken(pos.getToken());
-    }
-
-    public static EndpointsForRange getLiveSortedReplicas(Keyspace keyspace, 
RingPosition pos)
-    {
-        EndpointsForRange liveReplicas = 
StorageService.instance.getLiveNaturalReplicas(keyspace, pos);
-        // Replica availability is considered by the query path
-        Preconditions.checkState(liveReplicas.isEmpty() || 
liveReplicas.stream().anyMatch(Replica::isFull),
-                                 "At least one full replica required for 
reads: " + liveReplicas);
-
-        return 
DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(),
 liveReplicas);
-    }
-
     /**
      * Estimate the number of result rows per range in the ring based on our 
local data.
      * <p>
@@ -1883,7 +1865,7 @@ public class StorageProxy implements StorageProxyMBean
         return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / 
keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
     }
 
-    private static class RangeIterator extends 
AbstractIterator<ReplicaLayout.ForRange>
+    private static class RangeIterator extends 
AbstractIterator<ReplicaPlan.ForRangeRead>
     {
         private final Keyspace keyspace;
         private final ConsistencyLevel consistency;
@@ -1907,43 +1889,34 @@ public class StorageProxy implements StorageProxyMBean
             return rangeCount;
         }
 
-        protected ReplicaLayout.ForRange computeNext()
+        protected ReplicaPlan.ForRangeRead computeNext()
         {
             if (!ranges.hasNext())
                 return endOfData();
 
-            AbstractBounds<PartitionPosition> range = ranges.next();
-            EndpointsForRange liveReplicas = getLiveSortedReplicas(keyspace, 
range.right);
-
-            int blockFor = consistency.blockFor(keyspace);
-            EndpointsForRange targetReplicas = 
consistency.filterForQuery(keyspace, liveReplicas);
-            int minResponses = Math.min(targetReplicas.size(), blockFor);
-
-            // Endpoints for range here as well
-            return ReplicaLayout.forRangeRead(keyspace, consistency, range,
-                                              liveReplicas, 
targetReplicas.subList(0, minResponses));
+            return ReplicaPlans.forRangeRead(keyspace, consistency, 
ranges.next());
         }
     }
 
-    private static class RangeMerger extends 
AbstractIterator<ReplicaLayout.ForRange>
+    private static class RangeMerger extends 
AbstractIterator<ReplicaPlan.ForRangeRead>
     {
         private final Keyspace keyspace;
         private final ConsistencyLevel consistency;
-        private final PeekingIterator<ReplicaLayout.ForRange> ranges;
+        private final PeekingIterator<ReplicaPlan.ForRangeRead> ranges;
 
-        private RangeMerger(Iterator<ReplicaLayout.ForRange> iterator, 
Keyspace keyspace, ConsistencyLevel consistency)
+        private RangeMerger(Iterator<ReplicaPlan.ForRangeRead> iterator, 
Keyspace keyspace, ConsistencyLevel consistency)
         {
             this.keyspace = keyspace;
             this.consistency = consistency;
             this.ranges = Iterators.peekingIterator(iterator);
         }
 
-        protected ReplicaLayout.ForRange computeNext()
+        protected ReplicaPlan.ForRangeRead computeNext()
         {
             if (!ranges.hasNext())
                 return endOfData();
 
-            ReplicaLayout.ForRange current = ranges.next();
+            ReplicaPlan.ForRangeRead current = ranges.next();
 
             // getRestrictedRange has broken the queried range into 
per-[vnode] token ranges, but this doesn't take
             // the replication factor into account. If the intersection of 
live endpoints for 2 consecutive ranges
@@ -1955,25 +1928,15 @@ public class StorageProxy implements StorageProxyMBean
                 // Note: it would be slightly more efficient to have 
CFS.getRangeSlice on the destination nodes unwraps
                 // the range if necessary and deal with it. However, we can't 
start sending wrapped range without breaking
                 // wire compatibility, so It's likely easier not to bother;
-                if (current.range.right.isMinimum())
+                if (current.range().right.isMinimum())
                     break;
 
-                ReplicaLayout.ForRange next = ranges.peek();
-
-                EndpointsForRange merged = 
current.all().keep(next.all().endpoints());
-
-                // Check if there is enough endpoint for the merge to be 
possible.
-                if (!consistency.isSufficientLiveNodesForRead(keyspace, 
merged))
-                    break;
-
-                EndpointsForRange filteredMerged = 
consistency.filterForQuery(keyspace, merged);
-
-                // Estimate whether merging will be a win or not
-                if 
(!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged,
 current.selected(), next.selected()))
+                ReplicaPlan.ForRangeRead next = ranges.peek();
+                ReplicaPlan.ForRangeRead merged = 
ReplicaPlans.maybeMerge(keyspace, consistency, current, next);
+                if (merged == null)
                     break;
 
-                // If we get there, merge this range and the next one
-                current = ReplicaLayout.forRangeRead(keyspace, consistency, 
current.range.withNewRight(next.range.right), merged, filteredMerged);
+                current = merged;
                 ranges.next(); // consume the range we just merged since we've 
only peeked so far
             }
             return current;
@@ -2018,7 +1981,7 @@ public class StorageProxy implements StorageProxyMBean
 
     private static class RangeCommandIterator extends 
AbstractIterator<RowIterator> implements PartitionIterator
     {
-        private final Iterator<ReplicaLayout.ForRange> ranges;
+        private final Iterator<ReplicaPlan.ForRangeRead> ranges;
         private final int totalRangeCount;
         private final PartitionRangeReadCommand command;
         private final boolean enforceStrictLiveness;
@@ -2108,42 +2071,40 @@ public class StorageProxy implements StorageProxyMBean
         /**
          * Queries the provided sub-range.
          *
-         * @param replicaLayout the subRange to query.
+         * @param replicaPlan the subRange to query.
          * @param isFirst in the case where multiple queries are sent in 
parallel, whether that's the first query on
          * that batch or not. The reason it matters is that whe paging 
queries, the command (more specifically the
          * {@code DataLimits}) may have "state" information and that state may 
only be valid for the first query (in
          * that it's the query that "continues" whatever we're previously 
queried).
          */
-        private SingleRangeResponse query(ReplicaLayout.ForRange 
replicaLayout, boolean isFirst)
+        private SingleRangeResponse query(ReplicaPlan.ForRangeRead 
replicaPlan, boolean isFirst)
         {
-            PartitionRangeReadCommand rangeCommand = 
command.forSubRange(replicaLayout.range, isFirst);
-            ReadRepair<EndpointsForRange, ReplicaLayout.ForRange> readRepair = 
ReadRepair.create(command, replicaLayout, queryStartNanoTime);
-            DataResolver<EndpointsForRange, ReplicaLayout.ForRange> resolver = 
new DataResolver<>(rangeCommand, replicaLayout, readRepair, queryStartNanoTime);
-            Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-
-            ReadCallback<EndpointsForRange, ReplicaLayout.ForRange> handler = 
new ReadCallback<>(resolver,
-                                                                               
                  replicaLayout.consistencyLevel().blockFor(keyspace),
-                                                                               
                  rangeCommand,
-                                                                               
                  replicaLayout,
-                                                                               
                  queryStartNanoTime);
+            PartitionRangeReadCommand rangeCommand = 
command.forSubRange(replicaPlan.range(), isFirst);
+            ReplicaPlan.SharedForRangeRead sharedReplicaPlan = 
ReplicaPlan.shared(replicaPlan);
+            ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair
+                    = ReadRepair.create(command, sharedReplicaPlan, 
queryStartNanoTime);
+            DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver
+                    = new DataResolver<>(rangeCommand, sharedReplicaPlan, 
readRepair, queryStartNanoTime);
+            ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler
+                    = new ReadCallback<>(resolver, rangeCommand, 
sharedReplicaPlan, queryStartNanoTime);
 
-            handler.assureSufficientLiveNodes();
+            replicaPlan.assureSufficientReplicas();
 
             // If enabled, request repaired data tracking info from full 
replicas but
             // only if there are multiple full replicas to compare results from
             if 
(DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled()
-                && replicaLayout.selected().filter(Replica::isFull).size() > 1)
+                    && replicaPlan.contacts().filter(Replica::isFull).size() > 
1)
             {
                 command.trackRepairedStatus();
             }
 
-            if (replicaLayout.selected().size() == 1 && 
replicaLayout.selected().get(0).isLocal())
+            if (replicaPlan.contacts().size() == 1 && 
replicaPlan.contacts().get(0).isLocal())
             {
                 StageManager.getStage(Stage.READ).execute(new 
LocalReadRunnable(rangeCommand, handler));
             }
             else
             {
-                for (Replica replica : replicaLayout.selected())
+                for (Replica replica : replicaPlan.contacts())
                 {
                     Tracing.trace("Enqueuing request to {}", replica);
                     PartitionRangeReadCommand command = replica.isFull() ? 
rangeCommand : rangeCommand.copyAsTransientQuery();
@@ -2486,7 +2447,7 @@ public class StorageProxy implements StorageProxyMBean
     public interface WritePerformer
     {
         public void apply(IMutation mutation,
-                          ReplicaLayout.ForToken targets,
+                          ReplicaPlan.ForTokenWrite targets,
                           AbstractWriteResponseHandler<IMutation> 
responseHandler,
                           String localDataCenter) throws OverloadedException;
     }
@@ -2499,7 +2460,7 @@ public class StorageProxy implements StorageProxyMBean
         public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> 
writeHandler, int i, BatchlogCleanup cleanup, long queryStartNanoTime)
         {
             super(writeHandler, i, cleanup, queryStartNanoTime);
-            viewWriteMetrics.viewReplicasAttempted.inc(totalEndpoints());
+            
viewWriteMetrics.viewReplicasAttempted.inc(candidateReplicaCount());
         }
 
         public void response(MessageIn<IMutation> msg)
@@ -2705,7 +2666,7 @@ public class StorageProxy implements StorageProxyMBean
                 HintsService.instance.write(hostIds, Hint.create(mutation, 
System.currentTimeMillis()));
                 
validTargets.forEach(HintsService.instance.metrics::incrCreatedHints);
                 // Notify the handler only for CL == ANY
-                if (responseHandler != null && 
responseHandler.replicaLayout.consistencyLevel() == ConsistencyLevel.ANY)
+                if (responseHandler != null && 
responseHandler.replicaPlan.consistencyLevel() == ConsistencyLevel.ANY)
                     responseHandler.response(null);
             }
         };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 7f4ae14..a979f1c 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -172,7 +172,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     public RangesAtEndpoint getLocalReplicas(String keyspaceName)
     {
-        return getReplicasForEndpoint(keyspaceName, 
FBUtilities.getBroadcastAddressAndPort());
+        return Keyspace.open(keyspaceName).getReplicationStrategy()
+                .getAddressReplicas(FBUtilities.getBroadcastAddressAndPort());
     }
 
     public List<Range<Token>> getLocalAndPendingRanges(String ks)
@@ -2015,11 +2016,10 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     */
     private EndpointsByRange constructRangeToEndpointMap(String keyspace, 
List<Range<Token>> ranges)
     {
+        AbstractReplicationStrategy strategy = 
Keyspace.open(keyspace).getReplicationStrategy();
         Map<Range<Token>, EndpointsForRange> rangeToEndpointMap = new 
HashMap<>(ranges.size());
         for (Range<Token> range : ranges)
-        {
-            rangeToEndpointMap.put(range, 
Keyspace.open(keyspace).getReplicationStrategy().getNaturalReplicas(range.right));
-        }
+            rangeToEndpointMap.put(range, 
strategy.getNaturalReplicas(range.right));
         return new EndpointsByRange(rangeToEndpointMap);
     }
 
@@ -3878,16 +3878,6 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     }
 
     /**
-     * Get all ranges an endpoint is responsible for (by keyspace)
-     * @param ep endpoint we are interested in.
-     * @return ranges for the specified endpoint.
-     */
-    RangesAtEndpoint getReplicasForEndpoint(String keyspaceName, 
InetAddressAndPort ep)
-    {
-        return 
Keyspace.open(keyspaceName).getReplicationStrategy().getAddressReplicas(ep);
-    }
-
-    /**
      * Get all ranges that span the ring given a set
      * of tokens. All ranges are in sorted order of
      * ranges.
@@ -3936,11 +3926,10 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return Replicas.stringify(getNaturalReplicasForToken(keyspaceName, cf, 
key), true);
     }
 
-
     @Deprecated
     public List<InetAddress> getNaturalEndpoints(String keyspaceName, 
ByteBuffer key)
     {
-        EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, 
tokenMetadata.partitioner.getToken(key));
+        EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, 
key);
         List<InetAddress> inetList = new ArrayList<>(replicas.size());
         replicas.forEach(r -> inetList.add(r.endpoint().address));
         return inetList;
@@ -3948,7 +3937,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     public List<String> getNaturalEndpointsWithPort(String keyspaceName, 
ByteBuffer key)
     {
-        return Replicas.stringify(getNaturalReplicasForToken(keyspaceName, 
tokenMetadata.partitioner.getToken(key)), true);
+        EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, 
key);
+        return Replicas.stringify(replicas, true);
     }
 
     public List<String> getReplicas(String keyspaceName, String cf, String key)
@@ -3971,61 +3961,13 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         if (metadata == null)
             throw new IllegalArgumentException("Unknown table '" + cf + "' in 
keyspace '" + keyspaceName + "'");
 
-        return getNaturalReplicasForToken(keyspaceName, 
tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key)));
-    }
-
-    /**
-     * This method returns the N endpoints that are responsible for storing the
-     * specified key i.e for replication.
-     *
-     * @param keyspaceName keyspace name also known as keyspace
-     * @param pos position for which we need to find the endpoint
-     * @return the endpoint responsible for this token
-     */
-    public static EndpointsForToken getNaturalReplicasForToken(String 
keyspaceName, RingPosition pos)
-    {
-        return 
Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(pos);
+        return getNaturalReplicasForToken(keyspaceName, 
metadata.partitionKeyType.fromString(key));
     }
 
-    /**
-     * Returns the endpoints currently responsible for storing the token plus 
pending ones
-     */
-    public EndpointsForToken getNaturalAndPendingReplicasForToken(String 
keyspaceName, Token token)
-    {
-        // TODO: race condition to fetch these. impliciations??
-        EndpointsForToken natural = getNaturalReplicasForToken(keyspaceName, 
token);
-        EndpointsForToken pending = 
tokenMetadata.pendingEndpointsForToken(token, keyspaceName);
-        if (Endpoints.haveConflicts(natural, pending))
-        {
-            natural = Endpoints.resolveConflictsInNatural(natural, pending);
-            pending = Endpoints.resolveConflictsInPending(natural, pending);
-        }
-        return Endpoints.concat(natural, pending);
-    }
-
-    /**
-     * This method attempts to return N endpoints that are responsible for 
storing the
-     * specified key i.e for replication.
-     *
-     * @param keyspace keyspace name also known as keyspace
-     * @param pos position for which we need to find the endpoint
-     */
-    public EndpointsForToken getLiveNaturalReplicasForToken(Keyspace keyspace, 
RingPosition pos)
-    {
-        return getLiveNaturalReplicas(keyspace, pos).forToken(pos.getToken());
-    }
-
-    /**
-     * This method attempts to return N endpoints that are responsible for 
storing the
-     * specified key i.e for replication.
-     *
-     * @param keyspace keyspace name also known as keyspace
-     * @param pos position for which we need to find the endpoint
-     */
-    public EndpointsForRange getLiveNaturalReplicas(Keyspace keyspace, 
RingPosition pos)
+    public EndpointsForToken getNaturalReplicasForToken(String keyspaceName, 
ByteBuffer key)
     {
-        EndpointsForRange replicas = 
keyspace.getReplicationStrategy().getNaturalReplicas(pos);
-        return replicas.filter(r -> 
FailureDetector.instance.isAlive(r.endpoint()));
+        Token token = tokenMetadata.partitioner.getToken(key);
+        return 
Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(token);
     }
 
     public void setLoggingLevel(String classQualifier, String rawLevel) throws 
Exception
@@ -4268,7 +4210,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                              .filter(endpoint -> 
FailureDetector.instance.isAlive(endpoint) && 
!FBUtilities.getBroadcastAddressAndPort().equals(endpoint))
                              .collect(Collectors.toList());
 
-        return 
EndpointsForRange.copyOf(SystemReplicas.getSystemReplicas(endpoints));
+        return SystemReplicas.getSystemReplicas(endpoints);
     }
     /**
      * Find the best target to stream hints to. Currently the closest peer 
according to the snitch

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index a07aae6..f9bfedf 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -19,9 +19,7 @@ package org.apache.cassandra.service;
 
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.ReplicaLayout;
-
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,18 +37,18 @@ public class WriteResponseHandler<T> extends 
AbstractWriteResponseHandler<T>
     private static final AtomicIntegerFieldUpdater<WriteResponseHandler> 
responsesUpdater
             = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, 
"responses");
 
-    public WriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+    public WriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
                                 Runnable callback,
                                 WriteType writeType,
                                 long queryStartNanoTime)
     {
-        super(replicaLayout, callback, writeType, queryStartNanoTime);
-        responses = totalBlockFor();
+        super(replicaPlan, callback, writeType, queryStartNanoTime);
+        responses = blockFor();
     }
 
-    public WriteResponseHandler(ReplicaLayout.ForToken replicaLayout, 
WriteType writeType, long queryStartNanoTime)
+    public WriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, 
WriteType writeType, long queryStartNanoTime)
     {
-        this(replicaLayout, null, writeType, queryStartNanoTime);
+        this(replicaPlan, null, writeType, queryStartNanoTime);
     }
 
     public void response(MessageIn<T> m)
@@ -65,7 +63,7 @@ public class WriteResponseHandler<T> extends 
AbstractWriteResponseHandler<T>
 
     protected int ackCount()
     {
-        return totalBlockFor() - responses;
+        return blockFor() - responses;
     }
 
     public boolean isLatencyForSnitch()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to