ReplicaPlan/Layout refactor follow-up/completion

Finish much of the work to clarify endpoint selection
that was begun in Transient Replication (CASSANDRA-14404)

Also fixes:
  - commitPaxos was incorrectly selecting only live nodes,
    when needed to include down
  - We were not writing to pending transient replicas
  - On write, we were not hinting to full nodes with transient
    replication
  - rr.maybeSendAdditional{Reads,Writes} would only consult the
    same node we may have speculated a read to
  - transient->full movements mishandled consistency level upgrade by
    retaining the 'full' pending variant, which increased CL requirement;
    instead, the 'natural' replica is upgraded to 'full' for writes

patch by Benedict; reviewed by Alex Petrov and Ariel Weisberg for 
CASSANDRA-14705


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/047bcd7a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/047bcd7a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/047bcd7a

Branch: refs/heads/trunk
Commit: 047bcd7ad171d6a4aa89128c5e6c6ed5f012b1c0
Parents: 05dbb3e
Author: Benedict Elliott Smith <bened...@apple.com>
Authored: Fri Sep 7 11:41:28 2018 +0100
Committer: Benedict Elliott Smith <bened...@apple.com>
Committed: Fri Sep 14 10:14:37 2018 +0100

----------------------------------------------------------------------
 .../cassandra/batchlog/BatchlogManager.java     |  64 ++-
 .../apache/cassandra/db/ConsistencyLevel.java   |  26 +-
 .../apache/cassandra/gms/FailureDetector.java   |   4 +
 .../apache/cassandra/hints/HintsService.java    |   4 +-
 .../locator/AbstractReplicaCollection.java      |  67 +--
 .../locator/AbstractReplicationStrategy.java    |  20 +-
 .../org/apache/cassandra/locator/Endpoints.java |  59 +--
 .../cassandra/locator/EndpointsForRange.java    |   2 +-
 .../cassandra/locator/EndpointsForToken.java    |   2 +-
 .../cassandra/locator/RangesAtEndpoint.java     |   2 +-
 .../cassandra/locator/ReplicaCollection.java    |  27 +-
 .../apache/cassandra/locator/ReplicaLayout.java | 435 ++++++++-----------
 .../apache/cassandra/locator/ReplicaPlan.java   | 240 ++++++++++
 .../apache/cassandra/locator/ReplicaPlans.java  | 295 +++++++++++++
 .../cassandra/locator/SystemReplicas.java       |  12 +-
 .../apache/cassandra/locator/TokenMetadata.java |   8 +-
 .../apache/cassandra/net/IAsyncCallback.java    |  10 -
 .../service/AbstractWriteResponseHandler.java   |  50 +--
 .../cassandra/service/ActiveRepairService.java  |   2 +-
 .../service/BatchlogResponseHandler.java        |  15 +-
 .../DatacenterSyncWriteResponseHandler.java     |  12 +-
 .../service/DatacenterWriteResponseHandler.java |  10 +-
 .../apache/cassandra/service/StorageProxy.java  | 239 +++++-----
 .../cassandra/service/StorageService.java       |  82 +---
 .../cassandra/service/WriteResponseHandler.java |  16 +-
 .../service/reads/AbstractReadExecutor.java     |  78 ++--
 .../cassandra/service/reads/DataResolver.java   |  26 +-
 .../cassandra/service/reads/DigestResolver.java |  22 +-
 .../cassandra/service/reads/ReadCallback.java   |  62 ++-
 .../service/reads/ResponseResolver.java         |  22 +-
 .../reads/ShortReadPartitionsProtection.java    |  19 +-
 .../reads/repair/AbstractReadRepair.java        |  42 +-
 .../reads/repair/BlockingPartitionRepair.java   |  29 +-
 .../reads/repair/BlockingReadRepair.java        |  25 +-
 .../service/reads/repair/NoopReadRepair.java    |  11 +-
 .../repair/PartitionIteratorMergeListener.java  |  16 +-
 .../reads/repair/ReadOnlyReadRepair.java        |  13 +-
 .../service/reads/repair/ReadRepair.java        |  17 +-
 .../reads/repair/ReadRepairDiagnostics.java     |  11 +-
 .../service/reads/repair/ReadRepairEvent.java   |   2 +-
 .../reads/repair/ReadRepairStrategy.java        |  11 +-
 .../reads/repair/RowIteratorMergeListener.java  |  21 +-
 .../locator/ReplicaCollectionTest.java          |  89 ++--
 .../service/WriteResponseHandlerTest.java       |   4 +-
 .../WriteResponseHandlerTransientTest.java      |  71 ++-
 .../service/reads/DataResolverTest.java         |  21 +-
 .../reads/DataResolverTransientTest.java        | 227 ++++++++++
 .../service/reads/DigestResolverTest.java       |   5 +-
 .../service/reads/ReadExecutorTest.java         |   7 +-
 .../reads/repair/AbstractReadRepairTest.java    |  32 +-
 .../reads/repair/BlockingReadRepairTest.java    |  36 +-
 .../DiagEventsBlockingReadRepairTest.java       |  26 +-
 .../reads/repair/InstrumentedReadRepair.java    |   4 +-
 .../reads/repair/ReadOnlyReadRepairTest.java    |  24 +-
 .../service/reads/repair/ReadRepairTest.java    |  12 +-
 .../reads/repair/TestableReadRepair.java        |  18 +-
 56 files changed, 1655 insertions(+), 1051 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java 
b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 8dda54e..77f725c 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -26,12 +26,9 @@ import java.util.concurrent.*;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicates;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.RateLimiter;
 
-import org.apache.cassandra.locator.ReplicaLayout;
-import org.apache.cassandra.locator.EndpointsForToken;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +51,8 @@ import org.apache.cassandra.hints.Hint;
 import org.apache.cassandra.hints.HintsService;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -459,36 +458,34 @@ public class BatchlogManager implements 
BatchlogManagerMBean
             Keyspace keyspace = Keyspace.open(ks);
             Token tk = mutation.key().getToken();
 
-            EndpointsForToken replicas = 
StorageService.instance.getNaturalAndPendingReplicasForToken(ks, tk);
-            Replicas.temporaryAssertFull(replicas); // TODO in CASSANDRA-14549
+            // TODO: this logic could do with revisiting at some point, as it 
is unclear what its rationale is
+            // we perform a local write, ignoring errors and inline in this 
thread (potentially slowing replay down)
+            // effectively bumping CL for locally owned writes and also 
potentially stalling log replay if an error occurs
+            // once we decide how it should work, it can also probably be 
simplified, and avoid constructing a ReplicaPlan directly
+            ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWriteLiveAndDown(keyspace, tk);
+            Replicas.temporaryAssertFull(liveAndDown.all()); // TODO in 
CASSANDRA-14549
+
+            Replica selfReplica = liveAndDown.all().selfIfPresent();
+            if (selfReplica != null)
+                mutation.apply();
 
-            EndpointsForToken.Builder liveReplicasBuilder = 
EndpointsForToken.builder(tk);
-            for (Replica replica : replicas)
+            ReplicaLayout.ForTokenWrite liveRemoteOnly = liveAndDown.filter(
+                    r -> FailureDetector.isReplicaAlive.test(r) && r != 
selfReplica);
+
+            for (Replica replica : liveAndDown.all())
             {
-                if (replica.isLocal())
-                {
-                    mutation.apply();
-                }
-                else if (FailureDetector.instance.isAlive(replica.endpoint()))
-                {
-                    liveReplicasBuilder.add(replica); // will try delivering 
directly instead of writing a hint.
-                }
-                else
-                {
-                    hintedNodes.add(replica.endpoint());
-                    
HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(replica.endpoint()),
-                                                Hint.create(mutation, 
writtenAt));
-                }
+                if (replica == selfReplica || 
liveRemoteOnly.all().contains(replica))
+                    continue;
+                hintedNodes.add(replica.endpoint());
+                
HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(replica.endpoint()),
+                        Hint.create(mutation, writtenAt));
             }
 
-            EndpointsForToken liveReplicas = liveReplicasBuilder.build();
-            if (liveReplicas.isEmpty())
-                return null;
-
-            Replicas.temporaryAssertFull(liveReplicas);
-            ReplayWriteResponseHandler<Mutation> handler = new 
ReplayWriteResponseHandler<>(keyspace, liveReplicas, System.nanoTime());
+            ReplicaPlan.ForTokenWrite replicaPlan = new 
ReplicaPlan.ForTokenWrite(keyspace, ConsistencyLevel.ONE,
+                    liveRemoteOnly.pending(), liveRemoteOnly.all(), 
liveRemoteOnly.all(), liveRemoteOnly.all());
+            ReplayWriteResponseHandler<Mutation> handler = new 
ReplayWriteResponseHandler<>(replicaPlan, System.nanoTime());
             MessageOut<Mutation> message = mutation.createMessage();
-            for (Replica replica : liveReplicas)
+            for (Replica replica : liveRemoteOnly.all())
                 MessagingService.instance().sendWriteRR(message, replica, 
handler, false);
             return handler;
         }
@@ -509,17 +506,16 @@ public class BatchlogManager implements 
BatchlogManagerMBean
         {
             private final Set<InetAddressAndPort> undelivered = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
 
-            ReplayWriteResponseHandler(Keyspace keyspace, EndpointsForToken 
writeReplicas, long queryStartNanoTime)
+            ReplayWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, 
long queryStartNanoTime)
             {
-                super(ReplicaLayout.forWriteWithDownNodes(keyspace, null, 
writeReplicas.token(), writeReplicas, 
EndpointsForToken.empty(writeReplicas.token())),
-                      null, WriteType.UNLOGGED_BATCH, queryStartNanoTime);
-                Iterables.addAll(undelivered, writeReplicas.endpoints());
+                super(replicaPlan, null, WriteType.UNLOGGED_BATCH, 
queryStartNanoTime);
+                Iterables.addAll(undelivered, 
replicaPlan.contacts().endpoints());
             }
 
             @Override
-            protected int totalBlockFor()
+            protected int blockFor()
             {
-                return this.replicaLayout.selected().size();
+                return this.replicaPlan.contacts().size();
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java 
b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 35ba198..5a4baf7 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -251,16 +251,10 @@ public enum ConsistencyLevel
         if (this == EACH_QUORUM && keyspace.getReplicationStrategy() 
instanceof NetworkTopologyStrategy)
             return filterForEachQuorum(keyspace, liveReplicas);
 
-        /*
-         * Endpoints are expected to be restricted to live replicas, sorted by 
snitch preference.
-         * For LOCAL_QUORUM, move local-DC replicas in front first as we need 
them there whether
-         * we do read repair (since the first replica gets the data read) or 
not (since we'll take
-         * the blockFor first ones).
-         */
-        if (isDCLocal)
-            liveReplicas = 
liveReplicas.sorted(DatabaseDescriptor.getLocalComparator());
-
-        return liveReplicas.subList(0, Math.min(liveReplicas.size(), 
blockFor(keyspace) + (alwaysSpeculate ? 1 : 0)));
+        int count = blockFor(keyspace) + (alwaysSpeculate ? 1 : 0);
+        return isDCLocal
+                ? liveReplicas.filter(ConsistencyLevel::isLocal, count)
+                : liveReplicas.subList(0, Math.min(liveReplicas.size(), 
count));
     }
 
     private <E extends Endpoints<E>> E filterForEachQuorum(Keyspace keyspace, 
E liveReplicas)
@@ -285,7 +279,7 @@ public enum ConsistencyLevel
         });
     }
 
-    public boolean isSufficientLiveNodesForRead(Keyspace keyspace, 
Endpoints<?> liveReplicas)
+    public boolean isSufficientLiveReplicasForRead(Keyspace keyspace, 
Endpoints<?> liveReplicas)
     {
         switch (this)
         {
@@ -316,15 +310,15 @@ public enum ConsistencyLevel
         }
     }
 
-    public void assureSufficientLiveNodesForRead(Keyspace keyspace, 
Endpoints<?> liveReplicas) throws UnavailableException
+    public void assureSufficientLiveReplicasForRead(Keyspace keyspace, 
Endpoints<?> liveReplicas) throws UnavailableException
     {
-        assureSufficientLiveNodes(keyspace, liveReplicas, blockFor(keyspace), 
1);
+        assureSufficientLiveReplicas(keyspace, liveReplicas, 
blockFor(keyspace), 1);
     }
-    public void assureSufficientLiveNodesForWrite(Keyspace keyspace, 
Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException
+    public void assureSufficientLiveReplicasForWrite(Keyspace keyspace, 
Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException
     {
-        assureSufficientLiveNodes(keyspace, allLive, blockForWrite(keyspace, 
pendingWithDown), 0);
+        assureSufficientLiveReplicas(keyspace, allLive, 
blockForWrite(keyspace, pendingWithDown), 0);
     }
-    public void assureSufficientLiveNodes(Keyspace keyspace, Endpoints<?> 
allLive, int blockFor, int blockForFullReplicas) throws UnavailableException
+    void assureSufficientLiveReplicas(Keyspace keyspace, Endpoints<?> allLive, 
int blockFor, int blockForFullReplicas) throws UnavailableException
     {
         switch (this)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java 
b/src/java/org/apache/cassandra/gms/FailureDetector.java
index e567b7b..d7f73ab 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -27,11 +27,13 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.*;
 
+import org.apache.cassandra.locator.Replica;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +73,8 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
     }
 
     public static final IFailureDetector instance = new FailureDetector();
+    public static final Predicate<InetAddressAndPort> isEndpointAlive = 
instance::isAlive;
+    public static final Predicate<Replica> isReplicaAlive = r -> 
isEndpointAlive.test(r.endpoint());
 
     // this is useless except to provide backwards compatibility in 
phi_convict_threshold,
     // because everyone seems pretty accustomed to the default of 8, and users 
who have

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java 
b/src/java/org/apache/cassandra/hints/HintsService.java
index c6ad3d9..73840d3 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -34,6 +34,8 @@ import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -185,7 +187,7 @@ public final class HintsService implements HintsServiceMBean
         String keyspaceName = hint.mutation.getKeyspaceName();
         Token token = hint.mutation.key().getToken();
 
-        EndpointsForToken replicas = 
StorageService.instance.getNaturalAndPendingReplicasForToken(keyspaceName, 
token);
+        EndpointsForToken replicas = 
ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), 
token).all();
 
         // judicious use of streams: eagerly materializing probably cheaper
         // than performing filters / translations 2x extra via 
Iterables.filter/transform

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java 
b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
index 6a7a4ff..94ff991 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
@@ -75,7 +75,6 @@ public abstract class AbstractReplicaCollection<C extends 
AbstractReplicaCollect
      */
     public abstract Mutable<C> newMutable(int initialCapacity);
 
-
     public C snapshot()
     {
         return isSnapshot ? self()
@@ -83,6 +82,7 @@ public abstract class AbstractReplicaCollection<C extends 
AbstractReplicaCollect
                                                     : new ArrayList<>(list));
     }
 
+    /** see {@link ReplicaCollection#subList(int, int)}*/
     public final C subList(int start, int end)
     {
         List<Replica> subList;
@@ -100,11 +100,23 @@ public abstract class AbstractReplicaCollection<C extends 
AbstractReplicaCollect
         return snapshot(subList);
     }
 
+    /** see {@link ReplicaCollection#count(Predicate)}*/
+    public int count(Predicate<Replica> predicate)
+    {
+        int count = 0;
+        for (int i = 0 ; i < list.size() ; ++i)
+            if (predicate.test(list.get(i)))
+                ++count;
+        return count;
+    }
+
+    /** see {@link ReplicaCollection#filter(Predicate)}*/
     public final C filter(Predicate<Replica> predicate)
     {
         return filter(predicate, Integer.MAX_VALUE);
     }
 
+    /** see {@link ReplicaCollection#filter(Predicate, int)}*/
     public final C filter(Predicate<Replica> predicate, int limit)
     {
         if (isEmpty())
@@ -148,53 +160,8 @@ public abstract class AbstractReplicaCollection<C extends 
AbstractReplicaCollect
         return snapshot(copy);
     }
 
-    public final class Select
-    {
-        private final List<Replica> result;
-        public Select(int expectedSize)
-        {
-            this.result = new ArrayList<>(expectedSize);
-        }
-
-        /**
-         * Add matching replica to the result; this predicate should be 
mutually exclusive with all prior predicates.
-         * Stop once we have targetSize replicas in total, including preceding 
calls
-         */
-        public Select add(Predicate<Replica> predicate, int targetSize)
-        {
-            assert !Iterables.any(result, predicate::test);
-            for (int i = 0 ; result.size() < targetSize && i < list.size() ; 
++i)
-                if (predicate.test(list.get(i)))
-                    result.add(list.get(i));
-            return this;
-        }
-        public Select add(Predicate<Replica> predicate)
-        {
-            return add(predicate, Integer.MAX_VALUE);
-        }
-        public C get()
-        {
-            return snapshot(result);
-        }
-    }
-
-    /**
-     * An efficient method for selecting a subset of replica via a sequence of 
filters.
-     *
-     * Example: select().add(filter1).add(filter2, 3).get();
-     *
-     * @return a Select object
-     */
-    public final Select select()
-    {
-        return select(list.size());
-    }
-    public final Select select(int expectedSize)
-    {
-        return new Select(expectedSize);
-    }
-
-    public final C sorted(Comparator<Replica> comparator)
+    /** see {@link ReplicaCollection#sorted(Comparator)}*/
+   public final C sorted(Comparator<Replica> comparator)
     {
         List<Replica> copy = new ArrayList<>(list);
         copy.sort(comparator);
@@ -267,9 +234,9 @@ public abstract class AbstractReplicaCollection<C extends 
AbstractReplicaCollect
         if (replicas.isEmpty())
             return extraReplicas;
         Mutable<C> mutable = replicas.newMutable(replicas.size() + 
extraReplicas.size());
-        mutable.addAll(replicas);
+        mutable.addAll(replicas, Mutable.Conflict.NONE);
         mutable.addAll(extraReplicas, ignoreConflicts);
-        return mutable.asImmutableView();
+        return mutable.asSnapshot();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java 
b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index d168052..bad736f 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -141,33 +141,33 @@ public abstract class AbstractReplicationStrategy
      */
     public abstract EndpointsForRange calculateNaturalReplicas(Token 
searchToken, TokenMetadata tokenMetadata);
 
-    public <T> AbstractWriteResponseHandler<T> 
getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+    public <T> AbstractWriteResponseHandler<T> 
getWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
                                                                        
Runnable callback,
                                                                        
WriteType writeType,
                                                                        long 
queryStartNanoTime)
     {
-        return getWriteResponseHandler(replicaLayout, callback, writeType, 
queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel());
+        return getWriteResponseHandler(replicaPlan, callback, writeType, 
queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel());
     }
 
-    public <T> AbstractWriteResponseHandler<T> 
getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+    public <T> AbstractWriteResponseHandler<T> 
getWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
                                                                        
Runnable callback,
                                                                        
WriteType writeType,
                                                                        long 
queryStartNanoTime,
                                                                        
ConsistencyLevel idealConsistencyLevel)
     {
         AbstractWriteResponseHandler resultResponseHandler;
-        if (replicaLayout.consistencyLevel.isDatacenterLocal())
+        if (replicaPlan.consistencyLevel().isDatacenterLocal())
         {
             // block for in this context will be localnodes block.
-            resultResponseHandler = new 
DatacenterWriteResponseHandler<T>(replicaLayout, callback, writeType, 
queryStartNanoTime);
+            resultResponseHandler = new 
DatacenterWriteResponseHandler<T>(replicaPlan, callback, writeType, 
queryStartNanoTime);
         }
-        else if (replicaLayout.consistencyLevel == 
ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
+        else if (replicaPlan.consistencyLevel() == 
ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
         {
-            resultResponseHandler = new 
DatacenterSyncWriteResponseHandler<T>(replicaLayout, callback, writeType, 
queryStartNanoTime);
+            resultResponseHandler = new 
DatacenterSyncWriteResponseHandler<T>(replicaPlan, callback, writeType, 
queryStartNanoTime);
         }
         else
         {
-            resultResponseHandler = new WriteResponseHandler<T>(replicaLayout, 
callback, writeType, queryStartNanoTime);
+            resultResponseHandler = new WriteResponseHandler<T>(replicaPlan, 
callback, writeType, queryStartNanoTime);
         }
 
         //Check if tracking the ideal consistency level is configured
@@ -176,14 +176,14 @@ public abstract class AbstractReplicationStrategy
             //If ideal and requested are the same just use this handler to 
track the ideal consistency level
             //This is also used so that the ideal consistency level handler 
when constructed knows it is the ideal
             //one for tracking purposes
-            if (idealConsistencyLevel == replicaLayout.consistencyLevel)
+            if (idealConsistencyLevel == replicaPlan.consistencyLevel())
             {
                 
resultResponseHandler.setIdealCLResponseHandler(resultResponseHandler);
             }
             else
             {
                 //Construct a delegate response handler to use to track the 
ideal consistency level
-                AbstractWriteResponseHandler idealHandler = 
getWriteResponseHandler(replicaLayout.withConsistencyLevel(idealConsistencyLevel),
+                AbstractWriteResponseHandler idealHandler = 
getWriteResponseHandler(replicaPlan.withConsistencyLevel(idealConsistencyLevel),
                                                                                
     callback,
                                                                                
     writeType,
                                                                                
     queryStartNanoTime,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/Endpoints.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Endpoints.java 
b/src/java/org/apache/cassandra/locator/Endpoints.java
index 3d5faa4..28e578c 100644
--- a/src/java/org/apache/cassandra/locator/Endpoints.java
+++ b/src/java/org/apache/cassandra/locator/Endpoints.java
@@ -29,6 +29,11 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
+/**
+ * A collection of Endpoints for a given ring position.  This will typically 
reside in a ReplicaLayout,
+ * representing some subset of the endpoints for the Token or Range
+ * @param <E> The concrete type of Endpoints, that will be returned by the 
modifying methods
+ */
 public abstract class Endpoints<E extends Endpoints<E>> extends 
AbstractReplicaCollection<E>
 {
     static final Map<InetAddressAndPort, Replica> EMPTY_MAP = 
Collections.unmodifiableMap(new LinkedHashMap<>());
@@ -89,17 +94,32 @@ public abstract class Endpoints<E extends Endpoints<E>> 
extends AbstractReplicaC
         return filter(r -> !self.equals(r.endpoint()));
     }
 
+    public Replica selfIfPresent()
+    {
+        InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
+        return byEndpoint().get(self);
+    }
+
+    /**
+     * @return a collection without the provided endpoints, otherwise in the 
same order as this collection
+     */
     public E without(Set<InetAddressAndPort> remove)
     {
         return filter(r -> !remove.contains(r.endpoint()));
     }
 
+    /**
+     * @return a collection with only the provided endpoints (ignoring any not 
present), otherwise in the same order as this collection
+     */
     public E keep(Set<InetAddressAndPort> keep)
     {
         return filter(r -> keep.contains(r.endpoint()));
     }
 
-    public E keep(Iterable<InetAddressAndPort> endpoints)
+    /**
+     * @return a collection containing the Replica from this collection for 
the provided endpoints, in the order of the provided endpoints
+     */
+    public E select(Iterable<InetAddressAndPort> endpoints, boolean 
ignoreMissing)
     {
         ReplicaCollection.Mutable<E> copy = newMutable(
                 endpoints instanceof Collection<?>
@@ -109,10 +129,14 @@ public abstract class Endpoints<E extends Endpoints<E>> 
extends AbstractReplicaC
         Map<InetAddressAndPort, Replica> byEndpoint = byEndpoint();
         for (InetAddressAndPort endpoint : endpoints)
         {
-            Replica keep = byEndpoint.get(endpoint);
-            if (keep == null)
+            Replica select = byEndpoint.get(endpoint);
+            if (select == null)
+            {
+                if (!ignoreMissing)
+                    throw new IllegalArgumentException(endpoint + " is not 
present in " + this);
                 continue;
-            copy.add(keep, ReplicaCollection.Mutable.Conflict.DUPLICATE);
+            }
+            copy.add(select, ReplicaCollection.Mutable.Conflict.DUPLICATE);
         }
         return copy.asSnapshot();
     }
@@ -124,34 +148,19 @@ public abstract class Endpoints<E extends Endpoints<E>> 
extends AbstractReplicaC
      *   2) because a movement that changes the type of replication from 
transient to full must be handled
      *      differently for reads and writes (with the reader treating it as 
transient, and writer as full)
      *
-     * The method haveConflicts() below, and resolveConflictsInX, are used to 
detect and resolve any issues
+     * The method {@link ReplicaLayout#haveWriteConflicts} can be used to 
detect and resolve any issues
      */
     public static <E extends Endpoints<E>> E concat(E natural, E pending)
     {
         return AbstractReplicaCollection.concat(natural, pending, 
Conflict.NONE);
     }
 
-    public static <E extends Endpoints<E>> boolean haveConflicts(E natural, E 
pending)
-    {
-        Set<InetAddressAndPort> naturalEndpoints = natural.endpoints();
-        for (InetAddressAndPort pendingEndpoint : pending.endpoints())
-        {
-            if (naturalEndpoints.contains(pendingEndpoint))
-                return true;
-        }
-        return false;
-    }
-
-    // must apply first
-    public static <E extends Endpoints<E>> E resolveConflictsInNatural(E 
natural, E pending)
-    {
-        return natural.filter(r -> !r.isTransient() || 
!pending.contains(r.endpoint(), true));
-    }
-
-    // must apply second
-    public static <E extends Endpoints<E>> E resolveConflictsInPending(E 
natural, E pending)
+    public static <E extends Endpoints<E>> E append(E replicas, Replica 
extraReplica)
     {
-        return pending.without(natural.endpoints());
+        Mutable<E> mutable = replicas.newMutable(replicas.size() + 1);
+        mutable.addAll(replicas);
+        mutable.add(extraReplica, Conflict.NONE);
+        return mutable.asSnapshot();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/EndpointsForRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointsForRange.java 
b/src/java/org/apache/cassandra/locator/EndpointsForRange.java
index c2d8232..f812951 100644
--- a/src/java/org/apache/cassandra/locator/EndpointsForRange.java
+++ b/src/java/org/apache/cassandra/locator/EndpointsForRange.java
@@ -86,7 +86,7 @@ public class EndpointsForRange extends 
Endpoints<EndpointsForRange>
     {
         boolean hasSnapshot;
         public Mutable(Range<Token> range) { this(range, 0); }
-        public Mutable(Range<Token> range, int capacity) { super(range, new 
ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+        public Mutable(Range<Token> range, int capacity) { super(range, new 
ArrayList<>(capacity), false, new LinkedHashMap<>(capacity)); }
 
         public void add(Replica replica, Conflict ignoreConflict)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/EndpointsForToken.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointsForToken.java 
b/src/java/org/apache/cassandra/locator/EndpointsForToken.java
index f24c615..3446dc9 100644
--- a/src/java/org/apache/cassandra/locator/EndpointsForToken.java
+++ b/src/java/org/apache/cassandra/locator/EndpointsForToken.java
@@ -77,7 +77,7 @@ public class EndpointsForToken extends 
Endpoints<EndpointsForToken>
     {
         boolean hasSnapshot;
         public Mutable(Token token) { this(token, 0); }
-        public Mutable(Token token, int capacity) { super(token, new 
ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+        public Mutable(Token token, int capacity) { super(token, new 
ArrayList<>(capacity), false, new LinkedHashMap<>(capacity)); }
 
         public void add(Replica replica, Conflict ignoreConflict)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java 
b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
index 74828ad..1773173 100644
--- a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
+++ b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
@@ -173,7 +173,7 @@ public class RangesAtEndpoint extends 
AbstractReplicaCollection<RangesAtEndpoint
     {
         boolean hasSnapshot;
         public Mutable(InetAddressAndPort endpoint) { this(endpoint, 0); }
-        public Mutable(InetAddressAndPort endpoint, int capacity) { 
super(endpoint, new ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+        public Mutable(InetAddressAndPort endpoint, int capacity) { 
super(endpoint, new ArrayList<>(capacity), false, new 
LinkedHashMap<>(capacity)); }
 
         public void add(Replica replica, Conflict ignoreConflict)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaCollection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaCollection.java 
b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
index 6833f4b..d1006dc 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaCollection.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
@@ -62,6 +62,11 @@ public interface ReplicaCollection<C extends 
ReplicaCollection<C>> extends Itera
     public abstract boolean contains(Replica replica);
 
     /**
+     * @return the number of replicas that match the predicate
+     */
+    public abstract int count(Predicate<Replica> predicate);
+
+    /**
      * @return a *eagerly constructed* copy of this collection containing the 
Replica that match the provided predicate.
      * An effort will be made to either return ourself, or a subList, where 
possible.
      * It is guaranteed that no changes to any upstream Mutable will affect 
the state of the result.
@@ -108,16 +113,30 @@ public interface ReplicaCollection<C extends 
ReplicaCollection<C>> extends Itera
         C asImmutableView();
 
         /**
-         * @return an Immutable clone that assumes this Mutable will never be 
modified again.
-         * If this is not true, behaviour is undefined.
+         * @return an Immutable clone that assumes this Mutable will never be 
modified again,
+         * so its contents can be reused.
+         *
+         * This Mutable should enforce that it is no longer modified.
          */
         C asSnapshot();
 
-        enum Conflict { NONE, DUPLICATE, ALL}
+        /**
+         * Passed to add() and addAll() as ignoreConflicts parameter. The 
meaning of conflict varies by collection type
+         * (for Endpoints, it is a duplicate InetAddressAndPort; for 
RangesAtEndpoint it is a duplicate Range).
+         */
+        enum Conflict
+        {
+            /** fail on addition of any such conflict */
+            NONE,
+            /** fail on addition of any such conflict where the contents 
differ (first occurrence and position wins) */
+            DUPLICATE,
+            /** ignore all conflicts (the first occurrence and position wins) 
*/
+            ALL
+        }
 
         /**
          * @param replica add this replica to the end of the collection
-         * @param ignoreConflict if false, fail on any conflicting additions 
(as defined by C's semantics)
+         * @param ignoreConflict conflicts to ignore, see {@link Conflict}
          */
         void add(Replica replica, Conflict ignoreConflict);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaLayout.java 
b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
index 946a7f8..f48c989 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaLayout.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
@@ -18,364 +18,299 @@
 
 package org.apache.cassandra.locator;
 
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.function.Predicate;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
-import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static com.google.common.collect.Iterables.any;
+import java.util.Set;
+import java.util.function.Predicate;
 
 /**
- * Encapsulates knowledge about the ring necessary for performing a specific 
operation, with static accessors
- * for building the relevant layout.
+ * The relevant replicas for an operation over a given range or token.
  *
- * Constitutes:
- *  - the 'natural' replicas replicating the range or token relevant for the 
operation
- *  - if for performing a write, any 'pending' replicas that are taking 
ownership of the range, and must receive updates
- *  - the 'selected' replicas, those that should be targeted for any operation
- *  - 'all' replicas represents natural+pending
- *
- * @param <E> the type of Endpoints this ReplayLayout holds (either 
EndpointsForToken or EndpointsForRange)
- * @param <L> the type of itself, including its type parameters, for return 
type of modifying methods
+ * @param <E>
  */
-public abstract class ReplicaLayout<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>>
+public abstract class ReplicaLayout<E extends Endpoints<E>>
 {
-    private volatile E all;
-    protected final E natural;
-    protected final E pending;
-    protected final E selected;
-
-    protected final Keyspace keyspace;
-    protected final ConsistencyLevel consistencyLevel;
-
-    private ReplicaLayout(Keyspace keyspace, ConsistencyLevel 
consistencyLevel, E natural, E pending, E selected)
-    {
-        this(keyspace, consistencyLevel, natural, pending, selected, null);
-    }
+    private final E natural;
 
-    private ReplicaLayout(Keyspace keyspace, ConsistencyLevel 
consistencyLevel, E natural, E pending, E selected, E all)
+    ReplicaLayout(E natural)
     {
-        assert selected != null;
-        assert pending == null || !Endpoints.haveConflicts(natural, pending);
-        this.keyspace = keyspace;
-        this.consistencyLevel = consistencyLevel;
         this.natural = natural;
-        this.pending = pending;
-        this.selected = selected;
-        // if we logically have no pending endpoints (they are null), then 
'all' our endpoints are natural
-        if (all == null && pending == null)
-            all = natural;
-        this.all = all;
     }
 
-    public Replica getReplicaFor(InetAddressAndPort endpoint)
-    {
-        return natural.byEndpoint().get(endpoint);
-    }
-
-    public E natural()
+    /**
+     * The 'natural' owners of the ring position(s), as implied by the current 
ring layout.
+     * This excludes any pending owners, i.e. those that are in the process of 
taking ownership of a range, but
+     * have not yet finished obtaining their view of the range.
+     */
+    public final E natural()
     {
         return natural;
     }
 
-    public E all()
-    {
-        E result = all;
-        if (result == null)
-            all = result = Endpoints.concat(natural, pending);
-        return result;
-    }
-
-    public E selected()
-    {
-        return selected;
-    }
-
     /**
-     * @return the pending replicas - will be null for read layouts
-     * TODO: ideally we would enforce at compile time that read layouts have 
no pending to access
+     * All relevant owners of the ring position(s) for this operation, as 
implied by the current ring layout.
+     * For writes, this will include pending owners, and for reads it will be 
equivalent to natural()
      */
-    public E pending()
-    {
-        return pending;
-    }
-
-    public int blockFor()
+    public E all()
     {
-        return pending == null
-                ? consistencyLevel.blockFor(keyspace)
-                : consistencyLevel.blockForWrite(keyspace, pending);
+        return natural;
     }
 
-    public Keyspace keyspace()
+    public String toString()
     {
-        return keyspace;
+        return "ReplicaLayout [ natural: " + natural + " ]";
     }
 
-    public ConsistencyLevel consistencyLevel()
+    public static class ForTokenRead extends ReplicaLayout<EndpointsForToken> 
implements ForToken
     {
-        return consistencyLevel;
-    }
-
-    abstract public L withSelected(E replicas);
-
-    abstract public L withConsistencyLevel(ConsistencyLevel cl);
-
-    public L forNaturalUncontacted()
-    {
-        E more;
-        if (consistencyLevel.isDatacenterLocal() && 
keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
+        public ForTokenRead(EndpointsForToken natural)
         {
-            IEndpointSnitch snitch = keyspace.getReplicationStrategy().snitch;
-            String localDC = DatabaseDescriptor.getLocalDataCenter();
+            super(natural);
+        }
 
-            more = natural.filter(replica -> !selected.contains(replica) &&
-                    snitch.getDatacenter(replica).equals(localDC));
-        } else
+        @Override
+        public Token token()
         {
-            more = natural.filter(replica -> !selected.contains(replica));
+            return natural().token();
         }
 
-        return withSelected(more);
+        public ReplicaLayout.ForTokenRead filter(Predicate<Replica> filter)
+        {
+            EndpointsForToken filtered = natural().filter(filter);
+            // AbstractReplicaCollection.filter returns itself if all elements 
match the filter
+            if (filtered == natural()) return this;
+            return new ReplicaLayout.ForTokenRead(filtered);
+        }
     }
 
-    public static class ForRange extends ReplicaLayout<EndpointsForRange, 
ForRange>
+    public static class ForRangeRead extends ReplicaLayout<EndpointsForRange> 
implements ForRange
     {
-        public final AbstractBounds<PartitionPosition> range;
+        final AbstractBounds<PartitionPosition> range;
 
-        @VisibleForTesting
-        public ForRange(Keyspace keyspace, ConsistencyLevel consistencyLevel, 
AbstractBounds<PartitionPosition> range, EndpointsForRange natural, 
EndpointsForRange selected)
+        public ForRangeRead(AbstractBounds<PartitionPosition> range, 
EndpointsForRange natural)
         {
-            // Range queries do not contact pending replicas
-            super(keyspace, consistencyLevel, natural, null, selected);
+            super(natural);
             this.range = range;
         }
 
         @Override
-        public ForRange withSelected(EndpointsForRange newSelected)
+        public AbstractBounds<PartitionPosition> range()
         {
-            return new ForRange(keyspace, consistencyLevel, range, natural, 
newSelected);
+            return range;
         }
 
-        @Override
-        public ForRange withConsistencyLevel(ConsistencyLevel cl)
+        public ReplicaLayout.ForRangeRead filter(Predicate<Replica> filter)
         {
-            return new ForRange(keyspace, cl, range, natural, selected);
+            EndpointsForRange filtered = natural().filter(filter);
+            // AbstractReplicaCollection.filter returns itself if all elements 
match the filter
+            if (filtered == natural()) return this;
+            return new ReplicaLayout.ForRangeRead(range(), filtered);
         }
     }
 
-    public static class ForToken extends ReplicaLayout<EndpointsForToken, 
ForToken>
+    public static class ForWrite<E extends Endpoints<E>> extends 
ReplicaLayout<E>
     {
-        public final Token token;
+        final E all;
+        final E pending;
 
-        @VisibleForTesting
-        public ForToken(Keyspace keyspace, ConsistencyLevel consistencyLevel, 
Token token, EndpointsForToken natural, EndpointsForToken pending, 
EndpointsForToken selected)
+        ForWrite(E natural, E pending, E all)
         {
-            super(keyspace, consistencyLevel, natural, pending, selected);
-            this.token = token;
+            super(natural);
+            assert pending != null && !haveWriteConflicts(natural, pending);
+            if (all == null)
+                all = Endpoints.concat(natural, pending);
+            this.all = all;
+            this.pending = pending;
         }
 
-        public ForToken(Keyspace keyspace, ConsistencyLevel consistencyLevel, 
Token token, EndpointsForToken natural, EndpointsForToken pending, 
EndpointsForToken selected, EndpointsForToken all)
+        public final E all()
         {
-            super(keyspace, consistencyLevel, natural, pending, selected, all);
-            this.token = token;
+            return all;
         }
 
-        public ForToken withSelected(EndpointsForToken newSelected)
+        public final E pending()
         {
-            return new ForToken(keyspace, consistencyLevel, token, natural, 
pending, newSelected);
+            return pending;
         }
 
-        @Override
-        public ForToken withConsistencyLevel(ConsistencyLevel cl)
+        public String toString()
         {
-            return new ForToken(keyspace, cl, token, natural, pending, 
selected);
+            return "ReplicaLayout [ natural: " + natural() + ", pending: " + 
pending + " ]";
         }
     }
 
-    public static class ForPaxos extends ForToken
+    public static class ForTokenWrite extends ForWrite<EndpointsForToken> 
implements ForToken
     {
-        private final int requiredParticipants;
-
-        private ForPaxos(Keyspace keyspace, ConsistencyLevel consistencyLevel, 
Token token, int requiredParticipants, EndpointsForToken natural, 
EndpointsForToken pending, EndpointsForToken selected, EndpointsForToken all)
+        public ForTokenWrite(EndpointsForToken natural, EndpointsForToken 
pending)
         {
-            super(keyspace, consistencyLevel, token, natural, pending, 
selected, all);
-            this.requiredParticipants = requiredParticipants;
+            this(natural, pending, null);
         }
-
-        public int getRequiredParticipants()
+        public ForTokenWrite(EndpointsForToken natural, EndpointsForToken 
pending, EndpointsForToken all)
         {
-            return requiredParticipants;
+            super(natural, pending, all);
         }
-    }
 
-    public static ForToken forSingleReplica(Keyspace keyspace, Token token, 
Replica replica)
-    {
-        EndpointsForToken singleReplica = EndpointsForToken.of(token, replica);
-        return new ForToken(keyspace, ConsistencyLevel.ONE, token, 
singleReplica, EndpointsForToken.empty(token), singleReplica, singleReplica);
-    }
-
-    public static ForRange forSingleReplica(Keyspace keyspace, 
AbstractBounds<PartitionPosition> range, Replica replica)
-    {
-        EndpointsForRange singleReplica = EndpointsForRange.of(replica);
-        return new ForRange(keyspace, ConsistencyLevel.ONE, range, 
singleReplica, singleReplica);
-    }
+        @Override
+        public Token token() { return natural().token(); }
 
-    public static ForToken forCounterWrite(Keyspace keyspace, Token token, 
Replica replica)
-    {
-        return forSingleReplica(keyspace, token, replica);
+        public ReplicaLayout.ForTokenWrite filter(Predicate<Replica> filter)
+        {
+            EndpointsForToken filtered = all().filter(filter);
+            // AbstractReplicaCollection.filter returns itself if all elements 
match the filter
+            if (filtered == all()) return this;
+            // unique by endpoint, so can for efficiency filter only on 
endpoint
+            return new ReplicaLayout.ForTokenWrite(
+                    natural().keep(filtered.endpoints()),
+                    pending().keep(filtered.endpoints()),
+                    filtered
+            );
+        }
     }
 
-    public static ForToken forBatchlogWrite(Keyspace keyspace, 
Collection<InetAddressAndPort> endpoints) throws UnavailableException
+    public interface ForRange
     {
-        // A single case we write not for range or token, but multiple 
mutations to many tokens
-        Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
-        EndpointsForToken natural = EndpointsForToken.copyOf(token, 
SystemReplicas.getSystemReplicas(endpoints));
-        EndpointsForToken pending = EndpointsForToken.empty(token);
-        ConsistencyLevel consistencyLevel = natural.size() == 1 ? 
ConsistencyLevel.ONE : ConsistencyLevel.TWO;
-
-        return forWriteWithDownNodes(keyspace, consistencyLevel, token, 
natural, pending);
+        public AbstractBounds<PartitionPosition> range();
     }
 
-    public static ForToken forWriteWithDownNodes(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, Token token) throws UnavailableException
+    public interface ForToken
     {
-        return forWrite(keyspace, consistencyLevel, token, 
Predicates.alwaysTrue());
+        public Token token();
     }
 
-    public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel 
consistencyLevel, Token token, Predicate<InetAddressAndPort> isAlive) throws 
UnavailableException
+    /**
+     * Gets the 'natural' and 'pending' replicas that own a given token, with 
no filtering or processing.
+     *
+     * Since a write is intended for all nodes (except, unless necessary, 
transient replicas), this method's
+     * only responsibility is to fetch the 'natural' and 'pending' replicas, 
then resolve any conflicts
+     * {@link ReplicaLayout#haveWriteConflicts(Endpoints, Endpoints)}
+     */
+    public static ReplicaLayout.ForTokenWrite 
forTokenWriteLiveAndDown(Keyspace keyspace, Token token)
     {
-        EndpointsForToken natural = 
StorageService.getNaturalReplicasForToken(keyspace.getName(), token);
+        // TODO: race condition to fetch these. implications??
+        EndpointsForToken natural = 
keyspace.getReplicationStrategy().getNaturalReplicasForToken(token);
         EndpointsForToken pending = 
StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, 
keyspace.getName());
-        return forWrite(keyspace, consistencyLevel, token, natural, pending, 
isAlive);
-    }
-
-    public static ForToken forWriteWithDownNodes(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, 
EndpointsForToken pending) throws UnavailableException
-    {
-        return forWrite(keyspace, consistencyLevel, token, natural, pending, 
Predicates.alwaysTrue());
+        return forTokenWrite(natural, pending);
     }
 
-    public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel 
consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken 
pending, Predicate<InetAddressAndPort> isAlive) throws UnavailableException
+    public static ReplicaLayout.ForTokenWrite forTokenWrite(EndpointsForToken 
natural, EndpointsForToken pending)
     {
-        if (Endpoints.haveConflicts(natural, pending))
+        if (haveWriteConflicts(natural, pending))
         {
-            natural = Endpoints.resolveConflictsInNatural(natural, pending);
-            pending = Endpoints.resolveConflictsInPending(natural, pending);
+            natural = resolveWriteConflictsInNatural(natural, pending);
+            pending = resolveWriteConflictsInPending(natural, pending);
         }
-
-        if (!any(natural, Replica::isTransient) && !any(pending, 
Replica::isTransient))
-        {
-            EndpointsForToken selected = Endpoints.concat(natural, 
pending).filter(r -> isAlive.test(r.endpoint()));
-            return new ForToken(keyspace, consistencyLevel, token, natural, 
pending, selected);
-        }
-
-        return forWrite(keyspace, consistencyLevel, token, 
consistencyLevel.blockForWrite(keyspace, pending), natural, pending, isAlive);
+        return new ReplicaLayout.ForTokenWrite(natural, pending);
     }
 
-    public static ReplicaLayout.ForPaxos forPaxos(Keyspace keyspace, 
DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws 
UnavailableException
+    /**
+     * Detect if we have any endpoint in both pending and full; this can occur 
either due to races (there is no isolation)
+     * or because an endpoint is transitioning between full and transient 
replication status.
+     *
+     * We essentially always prefer the full version for writes, because this 
is stricter.
+     *
+     * For transient->full transitions:
+     *
+     *   Since we always write to any pending transient replica, effectively 
upgrading it to full for the transition duration,
+     *   it might at first seem to be OK to continue treating the conflict 
replica as its 'natural' transient form,
+     *   as there is always a quorum of nodes receiving the write.  However, 
ring ownership changes are not atomic or
+     *   consistent across the cluster, and it is possible for writers to see 
different ring states.
+     *
+     *   Furthermore, an operator would expect that the full node has received 
all writes, with no extra need for repair
+     *   (as the normal contract dictates) when it completes its transition.
+     *
+     *   While we cannot completely eliminate risks due to ring 
inconsistencies, this approach is the most conservative
+     *   available to us today to mitigate, and (we think) the easiest to 
reason about.
+     *
+     * For full->transient transitions:
+     *
+     *   In this case, things are dicier, because in theory we can trigger 
this change instantly.  All we need to do is
+     *   drop some data, surely?
+     *
+     *   Ring movements can put us in a pickle; any other node could believe 
us to be full when we have become transient,
+     *   and perform a full data request to us that we believe ourselves 
capable of answering, but that we are not.
+     *   If the ring is inconsistent, it's even feasible that a transient 
request would be made to the node that is losing
+     *   its transient status, that also does not know it has yet done so, 
resulting in all involved nodes being unaware
+     *   of the data inconsistency.
+     *
+     *   This happens because ring ownership changes are implied by a single 
node; not all owning nodes get a say in when
+     *   the transition takes effect.  As such, a node can hold an incorrect 
belief about its own ownership ranges.
+     *
+     *   This race condition is somewhat inherent in present day Cassandra, 
and there's actually a limit to what we can do about it.
+     *   It is a little more dangerous with transient replication, however, 
because we can completely answer a request without
+     *   ever touching a digest, meaning we are less likely to attempt to 
repair any inconsistency.
+     *
+     *   We aren't guaranteed to contact any different nodes for the data 
requests, of course, though we at least have a chance.
+     *
+     * Note: If we have any pending transient->full movement, we need to move 
the full replica to our 'natural' bucket
+     * to avoid corrupting our count.  This is fine for writes, all we're 
doing is ensuring we always write to the node,
+     * instead of selectively.
+     *
+     * @param natural
+     * @param pending
+     * @param <E>
+     * @return
+     */
+    static <E extends Endpoints<E>> boolean haveWriteConflicts(E natural, E 
pending)
     {
-        Token tk = key.getToken();
-        EndpointsForToken natural = 
StorageService.getNaturalReplicasForToken(keyspace.getName(), tk);
-        EndpointsForToken pending = 
StorageService.instance.getTokenMetadata().pendingEndpointsForToken(tk, 
keyspace.getName());
-        if (Endpoints.haveConflicts(natural, pending))
+        Set<InetAddressAndPort> naturalEndpoints = natural.endpoints();
+        for (InetAddressAndPort pendingEndpoint : pending.endpoints())
         {
-            natural = Endpoints.resolveConflictsInNatural(natural, pending);
-            pending = Endpoints.resolveConflictsInPending(natural, pending);
+            if (naturalEndpoints.contains(pendingEndpoint))
+                return true;
         }
-
-        // TODO CASSANDRA-14547
-        Replicas.temporaryAssertFull(natural);
-        Replicas.temporaryAssertFull(pending);
-
-        if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
-        {
-            // Restrict natural and pending to node in the local DC only
-            String localDc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
-            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-            Predicate<Replica> isLocalDc = replica -> 
localDc.equals(snitch.getDatacenter(replica));
-
-            natural = natural.filter(isLocalDc);
-            pending = pending.filter(isLocalDc);
-        }
-
-        int participants = pending.size() + natural.size();
-        int requiredParticipants = participants / 2 + 1; // See 
CASSANDRA-8346, CASSANDRA-833
-
-        EndpointsForToken all = Endpoints.concat(natural, pending);
-        EndpointsForToken selected = all.filter(IAsyncCallback.isReplicaAlive);
-        if (selected.size() < requiredParticipants)
-            throw UnavailableException.create(consistencyForPaxos, 
requiredParticipants, selected.size());
-
-        // We cannot allow CAS operations with 2 or more pending endpoints, 
see #8346.
-        // Note that we fake an impossible number of required nodes in the 
unavailable exception
-        // to nail home the point that it's an impossible operation no matter 
how many nodes are live.
-        if (pending.size() > 1)
-            throw new UnavailableException(String.format("Cannot perform LWT 
operation as there is more than one (%d) pending range movement", 
pending.size()),
-                                           consistencyForPaxos,
-                                           participants + 1,
-                                           selected.size());
-
-        return new ReplicaLayout.ForPaxos(keyspace, consistencyForPaxos, 
key.getToken(), requiredParticipants, natural, pending, selected, all);
+        return false;
     }
 
     /**
-     * We want to send mutations to as many full replicas as we can, and just 
as many transient replicas
-     * as we need to meet blockFor.
+     * MUST APPLY FIRST
+     * See {@link ReplicaLayout#haveWriteConflicts}
+     * @return a 'natural' replica collection, that has had its conflicts with 
pending repaired
      */
-    @VisibleForTesting
-    public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel 
consistencyLevel, Token token, int blockFor, EndpointsForToken natural, 
EndpointsForToken pending, Predicate<InetAddressAndPort> livePredicate) throws 
UnavailableException
+    private static <E extends Endpoints<E>> E resolveWriteConflictsInNatural(E 
natural, E pending)
     {
-        EndpointsForToken all = Endpoints.concat(natural, pending);
-        EndpointsForToken selected = all
-                .select()
-                .add(r -> r.isFull() && livePredicate.test(r.endpoint()))
-                .add(r -> r.isTransient() && livePredicate.test(r.endpoint()), 
blockFor)
-                .get();
-
-        consistencyLevel.assureSufficientLiveNodesForWrite(keyspace, selected, 
pending);
-
-        return new ForToken(keyspace, consistencyLevel, token, natural, 
pending, selected, all);
+        return natural.filter(r -> !r.isTransient() || 
!pending.contains(r.endpoint(), true));
     }
 
-    public static ForToken forRead(Keyspace keyspace, Token token, 
ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry)
+    /**
+     * MUST APPLY SECOND
+     * See {@link ReplicaLayout#haveWriteConflicts}
+     * @return a 'pending' replica collection, that has had its conflicts with 
natural repaired
+     */
+    private static <E extends Endpoints<E>> E resolveWriteConflictsInPending(E 
natural, E pending)
     {
-        EndpointsForToken natural = 
StorageProxy.getLiveSortedReplicasForToken(keyspace, token);
-        EndpointsForToken selected = consistencyLevel.filterForQuery(keyspace, 
natural, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE));
-
-        // Throw UAE early if we don't have enough replicas.
-        consistencyLevel.assureSufficientLiveNodesForRead(keyspace, selected);
-
-        return new ForToken(keyspace, consistencyLevel, token, natural, null, 
selected);
+        return pending.without(natural.endpoints());
     }
 
-    public static ForRange forRangeRead(Keyspace keyspace, ConsistencyLevel 
consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange 
natural, EndpointsForRange selected)
+    /**
+     * @return the read layout for a token - this includes only live natural 
replicas, i.e. those that are not pending
+     * and not marked down by the failure detector. these are reverse sorted 
by the badness score of the configured snitch
+     */
+    static ReplicaLayout.ForTokenRead forTokenReadLiveSorted(Keyspace 
keyspace, Token token)
     {
-        return new ForRange(keyspace, consistencyLevel, range, natural, 
selected);
+        EndpointsForToken replicas = 
keyspace.getReplicationStrategy().getNaturalReplicasForToken(token);
+        replicas = 
DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(),
 replicas);
+        replicas = replicas.filter(FailureDetector.isReplicaAlive);
+        return new ReplicaLayout.ForTokenRead(replicas);
     }
 
-    public String toString()
+    /**
+     * TODO: we should really double check that the provided range does not 
overlap multiple token ring regions
+     * @return the read layout for a range - this includes only live natural 
replicas, i.e. those that are not pending
+     * and not marked down by the failure detector. these are reverse sorted 
by the badness score of the configured snitch
+     */
+    static ReplicaLayout.ForRangeRead forRangeReadLiveSorted(Keyspace 
keyspace, AbstractBounds<PartitionPosition> range)
     {
-        return "ReplicaLayout [ CL: " + consistencyLevel + " keyspace: " + 
keyspace + " natural: " + natural + "pending: " + pending + " selected: " + 
selected + " ]";
+        EndpointsForRange replicas = 
keyspace.getReplicationStrategy().getNaturalReplicas(range.right);
+        replicas = 
DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(),
 replicas);
+        replicas = replicas.filter(FailureDetector.isReplicaAlive);
+        return new ReplicaLayout.ForRangeRead(range, replicas);
     }
-}
 
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlan.java 
b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
new file mode 100644
index 0000000..4d6127b
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.collect.Iterables;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+
+import java.util.function.Predicate;
+
+public abstract class ReplicaPlan<E extends Endpoints<E>>
+{
+    protected final Keyspace keyspace;
+    protected final ConsistencyLevel consistencyLevel;
+
+    // all nodes we will contact via any mechanism, including hints
+    // i.e., for:
+    //  - reads, only live natural replicas
+    //      ==> live.natural().subList(0, blockFor + initial speculate)
+    //  - writes, includes all full, and any pending replicas, (and only any 
necessary transient ones to make up the difference)
+    //      ==> liveAndDown.natural().filter(isFull) ++ liveAndDown.pending() 
++ live.natural.filter(isTransient, req)
+    //  - paxos, includes all live replicas (natural+pending), for this DC if 
SERIAL_LOCAL
+    //      ==> live.all()  (if consistencyLevel.isDCLocal(), then 
.filter(consistencyLevel.isLocal))
+    private final E contacts;
+
+    ReplicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, E 
contacts)
+    {
+        assert contacts != null;
+        this.keyspace = keyspace;
+        this.consistencyLevel = consistencyLevel;
+        this.contacts = contacts;
+    }
+
+    public abstract int blockFor();
+    public abstract void assureSufficientReplicas();
+
+    public E contacts() { return contacts; }
+    public boolean contacts(Replica replica) { return 
contacts.contains(replica); }
+    public Keyspace keyspace() { return keyspace; }
+    public ConsistencyLevel consistencyLevel() { return consistencyLevel; }
+
+    public static abstract class ForRead<E extends Endpoints<E>> extends 
ReplicaPlan<E>
+    {
+        // all nodes we *could* contacts; typically all natural replicas that 
are believed to be alive
+        // we will consult this collection to find uncontacted nodes we might 
contact if we doubt we will meet consistency level
+        private final E candidates;
+
+        ForRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, E 
candidates, E contact)
+        {
+            super(keyspace, consistencyLevel, contact);
+            this.candidates = candidates;
+        }
+
+        public int blockFor() { return consistencyLevel.blockFor(keyspace); }
+        public void assureSufficientReplicas() { 
consistencyLevel.assureSufficientLiveReplicasForRead(keyspace, candidates()); }
+
+        public E candidates() { return candidates; }
+
+        public E uncontactedCandidates()
+        {
+            return candidates().filter(r -> !contacts(r));
+        }
+
+        public Replica firstUncontactedCandidate(Predicate<Replica> 
extraPredicate)
+        {
+            return Iterables.tryFind(candidates(), r -> extraPredicate.test(r) 
&& !contacts(r)).orNull();
+        }
+
+        public Replica getReplicaFor(InetAddressAndPort endpoint)
+        {
+            return candidates().byEndpoint().get(endpoint);
+        }
+
+        public String toString()
+        {
+            return "ReplicaPlan.ForRead [ CL: " + consistencyLevel + " 
keyspace: " + keyspace + " candidates: " + candidates + " contacts: " + 
contacts() + " ]";
+        }
+    }
+
+    public static class ForTokenRead extends ForRead<EndpointsForToken>
+    {
+        public ForTokenRead(Keyspace keyspace, ConsistencyLevel 
consistencyLevel, EndpointsForToken candidates, EndpointsForToken contact)
+        {
+            super(keyspace, consistencyLevel, candidates, contact);
+        }
+
+        ForTokenRead withContact(EndpointsForToken newContact)
+        {
+            return new ForTokenRead(keyspace, consistencyLevel, candidates(), 
newContact);
+        }
+    }
+
+    public static class ForRangeRead extends ForRead<EndpointsForRange>
+    {
+        final AbstractBounds<PartitionPosition> range;
+
+        public ForRangeRead(Keyspace keyspace, ConsistencyLevel 
consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange 
candidates, EndpointsForRange contact)
+        {
+            super(keyspace, consistencyLevel, candidates, contact);
+            this.range = range;
+        }
+
+        public AbstractBounds<PartitionPosition> range() { return range; }
+
+        ForRangeRead withContact(EndpointsForRange newContact)
+        {
+            return new ForRangeRead(keyspace, consistencyLevel, range, 
candidates(), newContact);
+        }
+    }
+
+    public static abstract class ForWrite<E extends Endpoints<E>> extends 
ReplicaPlan<E>
+    {
+        // TODO: this is only needed because of poor isolation of concerns 
elsewhere - we can remove it soon, and will do so in a follow-up patch
+        final E pending;
+        final E liveAndDown;
+        final E live;
+
+        ForWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, E 
pending, E liveAndDown, E live, E contact)
+        {
+            super(keyspace, consistencyLevel, contact);
+            this.pending = pending;
+            this.liveAndDown = liveAndDown;
+            this.live = live;
+        }
+
+        public int blockFor() { return 
consistencyLevel.blockForWrite(keyspace, pending()); }
+        public void assureSufficientReplicas() { 
consistencyLevel.assureSufficientLiveReplicasForWrite(keyspace, live(), 
pending()); }
+
+        /** Replicas that a region of the ring is moving to; not yet ready to 
serve reads, but should receive writes */
+        public E pending() { return pending; }
+        /** Replicas that can participate in the write - this always includes 
all nodes (pending and natural) in all DCs, except for paxos LOCAL_QUORUM 
(which is local DC only) */
+        public E liveAndDown() { return liveAndDown; }
+        /** The live replicas present in liveAndDown, usually derived from 
FailureDetector.isReplicaAlive */
+        public E live() { return live; }
+        /** Calculate which live endpoints we could have contacted, but chose 
not to */
+        public E liveUncontacted() { return live().filter(r -> !contacts(r)); }
+        /** Test liveness, consistent with the upfront analysis done for this 
operation (i.e. test membership of live()) */
+        public boolean isAlive(Replica replica) { return 
live.endpoints().contains(replica.endpoint()); }
+
+        public String toString()
+        {
+            return "ReplicaPlan.ForWrite [ CL: " + consistencyLevel + " 
keyspace: " + keyspace + " liveAndDown: " + liveAndDown + " live: " + live + " 
contacts: " + contacts() +  " ]";
+        }
+    }
+
+    public static class ForTokenWrite extends ForWrite<EndpointsForToken>
+    {
+        public ForTokenWrite(Keyspace keyspace, ConsistencyLevel 
consistencyLevel, EndpointsForToken pending, EndpointsForToken liveAndDown, 
EndpointsForToken live, EndpointsForToken contact)
+        {
+            super(keyspace, consistencyLevel, pending, liveAndDown, live, 
contact);
+        }
+
+        private ReplicaPlan.ForTokenWrite copy(ConsistencyLevel 
newConsistencyLevel, EndpointsForToken newContact)
+        {
+            return new ReplicaPlan.ForTokenWrite(keyspace, 
newConsistencyLevel, pending(), liveAndDown(), live(), newContact);
+        }
+
+        ForTokenWrite withConsistencyLevel(ConsistencyLevel 
newConsistencylevel) { return copy(newConsistencylevel, contacts()); }
+        public ForTokenWrite withContact(EndpointsForToken newContact) { 
return copy(consistencyLevel, newContact); }
+    }
+
+    public static class ForPaxosWrite extends ForWrite<EndpointsForToken>
+    {
+        final int requiredParticipants;
+
+        ForPaxosWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, 
EndpointsForToken pending, EndpointsForToken liveAndDown, EndpointsForToken 
live, EndpointsForToken contact, int requiredParticipants)
+        {
+            super(keyspace, consistencyLevel, pending, liveAndDown, live, 
contact);
+            this.requiredParticipants = requiredParticipants;
+        }
+
+        public int requiredParticipants() { return requiredParticipants; }
+    }
+
+    /**
+     * Used by AbstractReadExecutor, {Data,Digest}Resolver and ReadRepair to 
share a ReplicaPlan whose 'contacts' replicas
+     * we progressively modify via various forms of speculation (initial 
speculation, rr-read and rr-write)
+     *
+     * The internal reference is not volatile, despite being shared between 
threads.  The initial reference provided to
+     * the constructor should be visible by the normal process of sharing data 
between threads (i.e. executors, etc)
+     * and any updates will either be seen or not seen, perhaps not promptly, 
but certainly not incompletely.
+     * The contained ReplicaPlan has only final member properties, so it 
cannot be seen partially initialised.
+     */
+    public interface Shared<E extends Endpoints<E>, P extends ReplicaPlan<E>>
+    {
+        /**
+         * add the provided replica to this shared plan, by updating the 
internal reference
+         */
+        public void addToContacts(Replica replica);
+        /**
+         * get the shared replica plan, non-volatile (so maybe stale) but no 
risk of partially initialised
+         */
+        public P get();
+        /**
+         * get the shared replica plan, non-volatile (so maybe stale) but no 
risk of partially initialised,
+         * but replace its 'contacts' with those provided
+         */
+        public abstract P getWithContacts(E endpoints);
+    }
+
+    public static class SharedForTokenRead implements 
Shared<EndpointsForToken, ForTokenRead>
+    {
+        private ForTokenRead replicaPlan;
+        SharedForTokenRead(ForTokenRead replicaPlan) { this.replicaPlan = 
replicaPlan; }
+        public void addToContacts(Replica replica) { replicaPlan = 
replicaPlan.withContact(Endpoints.append(replicaPlan.contacts(), replica)); }
+        public ForTokenRead get() { return replicaPlan; }
+        public ForTokenRead getWithContacts(EndpointsForToken newContact) { 
return replicaPlan.withContact(newContact); }
+    }
+
+    public static class SharedForRangeRead implements 
Shared<EndpointsForRange, ForRangeRead>
+    {
+        private ForRangeRead replicaPlan;
+        SharedForRangeRead(ForRangeRead replicaPlan) { this.replicaPlan = 
replicaPlan; }
+        public void addToContacts(Replica replica) { replicaPlan = 
replicaPlan.withContact(Endpoints.append(replicaPlan.contacts(), replica)); }
+        public ForRangeRead get() { return replicaPlan; }
+        public ForRangeRead getWithContacts(EndpointsForRange newContact) { 
return replicaPlan.withContact(newContact); }
+    }
+
+    public static SharedForTokenRead shared(ForTokenRead replicaPlan) { return 
new SharedForTokenRead(replicaPlan); }
+    public static SharedForRangeRead shared(ForRangeRead replicaPlan) { return 
new SharedForRangeRead(replicaPlan); }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to