Repository: cassandra
Updated Branches:
  refs/heads/trunk 05dbb3e0a -> 047bcd7ad


http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java 
b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
index cf1e06a..a3f13c2 100644
--- a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
+++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
@@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Predicates;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.locator.EndpointsForToken;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlans;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -236,7 +236,7 @@ public class WriteResponseHandlerTest
 
     private static AbstractWriteResponseHandler 
createWriteResponseHandler(ConsistencyLevel cl, ConsistencyLevel ideal, long 
queryStartTime)
     {
-        return 
ks.getReplicationStrategy().getWriteResponseHandler(ReplicaLayout.forWriteWithDownNodes(ks,
 cl, targets.token(), targets, pending),
+        return 
ks.getReplicationStrategy().getWriteResponseHandler(ReplicaPlans.forWrite(ks, 
cl, targets, pending, Predicates.alwaysTrue(), ReplicaPlans.writeAll),
                                                                    null, 
WriteType.SIMPLE, queryStartTime, ideal);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java 
b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
index c19e65e..b6c95dd 100644
--- 
a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
+++ 
b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
@@ -24,7 +24,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.function.Predicate;
 
-import com.google.common.base.Predicates;
 import com.google.common.collect.Sets;
 
 import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -32,10 +31,10 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.EndpointsForRange;
-import org.apache.cassandra.locator.ReplicaUtils;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
@@ -54,7 +53,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.locator.Replica.fullReplica;
 import static org.apache.cassandra.locator.Replica.transientReplica;
-import static org.apache.cassandra.locator.ReplicaUtils.FULL_RANGE;
 import static org.apache.cassandra.locator.ReplicaUtils.full;
 import static org.apache.cassandra.locator.ReplicaUtils.trans;
 
@@ -147,32 +145,35 @@ public class WriteResponseHandlerTransientTest
         dummy = 
DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(0));
     }
 
-    @Ignore("Throws unavailable for quorum as written")
     @Test
     public void checkPendingReplicasAreNotFiltered()
     {
-        EndpointsForToken natural = EndpointsForToken.of(dummy.getToken(), 
full(EP1), full(EP2), trans(EP3));
-        EndpointsForToken pending = EndpointsForToken.of(dummy.getToken(), 
full(EP4), full(EP5), trans(EP6));
-        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(ks, 
ConsistencyLevel.QUORUM, dummy.getToken(), 2, natural, pending, 
Predicates.alwaysTrue());
+        EndpointsForToken natural = EndpointsForToken.of(dummy.getToken(), 
full(EP1), full(EP2), trans(EP3), full(EP5));
+        EndpointsForToken pending = EndpointsForToken.of(dummy.getToken(), 
full(EP4), trans(EP6));
+        ReplicaLayout.ForTokenWrite layout = new 
ReplicaLayout.ForTokenWrite(natural, pending);
+        ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(ks, 
ConsistencyLevel.QUORUM, layout, layout, ReplicaPlans.writeAll);
 
-        Assert.assertEquals(EndpointsForRange.of(full(EP4), full(EP5), 
trans(EP6)), replicaLayout.pending());
+        Assert.assertEquals(EndpointsForRange.of(full(EP4), trans(EP6)), 
replicaPlan.pending());
     }
 
-    private static ReplicaLayout.ForToken expected(EndpointsForToken all, 
EndpointsForToken selected)
+    private static ReplicaPlan.ForTokenWrite expected(EndpointsForToken 
natural, EndpointsForToken selected)
     {
-        return new ReplicaLayout.ForToken(ks, ConsistencyLevel.QUORUM, 
dummy.getToken(), all, EndpointsForToken.empty(dummy.getToken()), selected);
+        return new ReplicaPlan.ForTokenWrite(ks, ConsistencyLevel.QUORUM, 
EndpointsForToken.empty(dummy.getToken()), natural, natural, selected);
     }
 
-    private static ReplicaLayout.ForToken 
getSpeculationContext(EndpointsForToken replicas, int blockFor, 
Predicate<InetAddressAndPort> livePredicate)
+    private static ReplicaPlan.ForTokenWrite 
getSpeculationContext(EndpointsForToken natural, Predicate<InetAddressAndPort> 
livePredicate)
     {
-        return ReplicaLayout.forWrite(ks, ConsistencyLevel.QUORUM, 
dummy.getToken(), blockFor, replicas, 
EndpointsForToken.empty(dummy.getToken()), livePredicate);
+        ReplicaLayout.ForTokenWrite liveAndDown = new 
ReplicaLayout.ForTokenWrite(natural, EndpointsForToken.empty(dummy.getToken()));
+        ReplicaLayout.ForTokenWrite live = new 
ReplicaLayout.ForTokenWrite(natural.filter(r -> 
livePredicate.test(r.endpoint())), EndpointsForToken.empty(dummy.getToken()));
+        return ReplicaPlans.forWrite(ks, ConsistencyLevel.QUORUM, liveAndDown, 
live, ReplicaPlans.writeNormal);
     }
 
-    private static void assertSpeculationReplicas(ReplicaLayout.ForToken 
expected, EndpointsForToken replicas, int blockFor, 
Predicate<InetAddressAndPort> livePredicate)
+    private static void assertSpeculationReplicas(ReplicaPlan.ForTokenWrite 
expected, EndpointsForToken replicas, Predicate<InetAddressAndPort> 
livePredicate)
     {
-        ReplicaLayout.ForToken actual = getSpeculationContext(replicas, 
blockFor, livePredicate);
-        Assert.assertEquals(expected.natural(), actual.natural());
-        Assert.assertEquals(expected.selected(), actual.selected());
+        ReplicaPlan.ForTokenWrite actual = getSpeculationContext(replicas, 
livePredicate);
+        Assert.assertEquals(expected.pending(), actual.pending());
+        Assert.assertEquals(expected.live(), actual.live());
+        Assert.assertEquals(expected.contacts(), actual.contacts());
     }
 
     private static Predicate<InetAddressAndPort> dead(InetAddressAndPort... 
endpoints)
@@ -186,39 +187,35 @@ public class WriteResponseHandlerTransientTest
         return EndpointsForToken.of(dummy.getToken(), rr);
     }
 
-    @Ignore("Throws unavailable for quorum as written")
     @Test
     public void checkSpeculationContext()
     {
-        EndpointsForToken all = replicas(full(EP1), full(EP2), trans(EP3));
+        EndpointsForToken all = replicas(full(EP1), full(EP2), trans(EP3), 
full(EP4), full(EP5), trans(EP6));
         // in happy path, transient replica should be classified as a backup
-        assertSpeculationReplicas(expected(all,
-                                           replicas(full(EP1), full(EP2))),
-                                  replicas(full(EP1), full(EP2), trans(EP3)),
-                                  2, dead());
-
-        // if one of the full replicas is dead, they should all be in the 
initial contacts
-        assertSpeculationReplicas(expected(all,
-                                           replicas(full(EP1), trans(EP3))),
-                                  replicas(full(EP1), full(EP2), trans(EP3)),
-                                  2, dead(EP2));
-
-        // block only for 1 full replica, use transient as backups
-        assertSpeculationReplicas(expected(all,
-                                           replicas(full(EP1))),
-                                  replicas(full(EP1), full(EP2), trans(EP3)),
-                                  1, dead(EP2));
+        assertSpeculationReplicas(expected(all, replicas(full(EP1), full(EP2), 
full(EP4), full(EP5))),
+                                  all,
+                                  dead());
+
+        // full replicas must always be in the contact list, and will occur 
first
+        assertSpeculationReplicas(expected(replicas(full(EP1), trans(EP3), 
full(EP4), trans(EP6)), replicas(full(EP1), full(EP2), full(EP4), full(EP5), 
trans(EP3), trans(EP6))),
+                                  all,
+                                  dead(EP2, EP5));
+
+        // only one transient used as backup
+        assertSpeculationReplicas(expected(replicas(full(EP1), trans(EP3), 
full(EP4), full(EP5), trans(EP6)), replicas(full(EP1), full(EP2), full(EP4), 
full(EP5), trans(EP3))),
+                all,
+                dead(EP2));
     }
 
     @Test (expected = UnavailableException.class)
     public void noFullReplicas()
     {
-        getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), 2, 
dead(EP1));
+        getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), 
dead(EP1));
     }
 
     @Test (expected = UnavailableException.class)
     public void notEnoughTransientReplicas()
     {
-        getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), 2, 
dead(EP2, EP3));
+        getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), 
dead(EP2, EP3));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java 
b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index 968ef16..c49bf3a 100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -87,7 +88,7 @@ public class DataResolverTest extends AbstractReadResponseTest
     {
         command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
         command.trackRepairedStatus();
-        readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM);
+        readRepair = new TestableReadRepair(command);
     }
 
     private static EndpointsForRange makeReplicas(int num)
@@ -338,7 +339,7 @@ public class DataResolverTest extends 
AbstractReadResponseTest
     public void testResolveWithBothEmpty()
     {
         EndpointsForRange replicas = makeReplicas(2);
-        TestableReadRepair readRepair = new TestableReadRepair(command, 
ConsistencyLevel.QUORUM);
+        TestableReadRepair readRepair = new TestableReadRepair(command);
         DataResolver resolver = new DataResolver(command, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime());
         resolver.preprocess(response(command, replicas.get(0).endpoint(), 
EmptyIterators.unfilteredPartition(cfm)));
         resolver.preprocess(response(command, replicas.get(1).endpoint(), 
EmptyIterators.unfilteredPartition(cfm)));
@@ -715,7 +716,7 @@ public class DataResolverTest extends 
AbstractReadResponseTest
     {
         EndpointsForRange replicas = makeReplicas(2);
         ReadCommand cmd = Util.cmd(cfs2, 
dk).withNowInSeconds(nowInSec).build();
-        TestableReadRepair readRepair = new TestableReadRepair(cmd, 
ConsistencyLevel.QUORUM);
+        TestableReadRepair readRepair = new TestableReadRepair(cmd);
         DataResolver resolver = new DataResolver(cmd, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime());
 
         long[] ts = {100, 200};
@@ -767,7 +768,7 @@ public class DataResolverTest extends 
AbstractReadResponseTest
     {
         EndpointsForRange replicas = makeReplicas(2);
         ReadCommand cmd = Util.cmd(cfs2, 
dk).withNowInSeconds(nowInSec).build();
-        TestableReadRepair readRepair = new TestableReadRepair(cmd, 
ConsistencyLevel.QUORUM);
+        TestableReadRepair readRepair = new TestableReadRepair(cmd);
         DataResolver resolver = new DataResolver(cmd, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime());
 
         long[] ts = {100, 200};
@@ -811,7 +812,7 @@ public class DataResolverTest extends 
AbstractReadResponseTest
     {
         EndpointsForRange replicas = makeReplicas(2);
         ReadCommand cmd = Util.cmd(cfs2, 
dk).withNowInSeconds(nowInSec).build();
-        TestableReadRepair readRepair = new TestableReadRepair(cmd, 
ConsistencyLevel.QUORUM);
+        TestableReadRepair readRepair = new TestableReadRepair(cmd);
         DataResolver resolver = new DataResolver(cmd, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime());
 
         long[] ts = {100, 200};
@@ -861,7 +862,7 @@ public class DataResolverTest extends 
AbstractReadResponseTest
     {
         EndpointsForRange replicas = makeReplicas(2);
         ReadCommand cmd = Util.cmd(cfs2, 
dk).withNowInSeconds(nowInSec).build();
-        TestableReadRepair readRepair = new TestableReadRepair(cmd, 
ConsistencyLevel.QUORUM);
+        TestableReadRepair readRepair = new TestableReadRepair(cmd);
         DataResolver resolver = new DataResolver(cmd, plan(replicas, 
ConsistencyLevel.ALL), readRepair, System.nanoTime());
 
         long[] ts = {100, 200};
@@ -1224,7 +1225,7 @@ public class DataResolverTest extends 
AbstractReadResponseTest
     }
 
     private DataResolver resolverWithVerifier(final ReadCommand command,
-                                              final ReplicaLayout.ForRange 
plan,
+                                              final 
ReplicaPlan.SharedForRangeRead plan,
                                               final ReadRepair readRepair,
                                               final long queryStartNanoTime,
                                               final RepairedDataVerifier 
verifier)
@@ -1232,7 +1233,7 @@ public class DataResolverTest extends 
AbstractReadResponseTest
         class TestableDataResolver extends DataResolver
         {
 
-            public TestableDataResolver(ReadCommand command, 
ReplicaLayout.ForRange plan, ReadRepair readRepair, long queryStartNanoTime)
+            public TestableDataResolver(ReadCommand command, 
ReplicaPlan.SharedForRangeRead plan, ReadRepair readRepair, long 
queryStartNanoTime)
             {
                 super(command, plan, readRepair, queryStartNanoTime);
             }
@@ -1298,9 +1299,9 @@ public class DataResolverTest extends 
AbstractReadResponseTest
         assertEquals(update.metadata().name, cfm.name);
     }
 
-    private ReplicaLayout.ForRange plan(EndpointsForRange replicas, 
ConsistencyLevel consistencyLevel)
+    private ReplicaPlan.SharedForRangeRead plan(EndpointsForRange replicas, 
ConsistencyLevel consistencyLevel)
     {
-        return new ReplicaLayout.ForRange(ks, consistencyLevel, 
ReplicaUtils.FULL_BOUNDS, replicas, replicas);
+        return ReplicaPlan.shared(new ReplicaPlan.ForRangeRead(ks, 
consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas));
     }
 
     private static void resolveAndConsume(DataResolver resolver)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java 
b/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
new file mode 100644
index 0000000..456cec4
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads;
+
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.SimpleBuilders;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.reads.repair.TestableReadRepair;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.db.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+import static org.apache.cassandra.locator.ReplicaUtils.full;
+import static org.apache.cassandra.locator.ReplicaUtils.trans;
+
+/**
+ * Tests DataResolvers handing of transient replicas
+ */
+public class DataResolverTransientTest extends AbstractReadResponseTest
+{
+    private static DecoratedKey key;
+
+    @Before
+    public void setUp()
+    {
+        key = Util.dk("key1");
+    }
+
+    private static PartitionUpdate.Builder update(TableMetadata metadata, 
String key, Row... rows)
+    {
+        PartitionUpdate.Builder builder = new 
PartitionUpdate.Builder(metadata, dk(key), metadata.regularAndStaticColumns(), 
rows.length, false);
+        for (Row row: rows)
+        {
+            builder.add(row);
+        }
+        return builder;
+    }
+
+    private static PartitionUpdate.Builder update(Row... rows)
+    {
+        return update(cfm, "key1", rows);
+    }
+
+    private static Row.SimpleBuilder rowBuilder(int clustering)
+    {
+        return new SimpleBuilders.RowBuilder(cfm, 
Integer.toString(clustering));
+    }
+
+    private static Row row(long timestamp, int clustering, int value)
+    {
+        return rowBuilder(clustering).timestamp(timestamp).add("c1", 
Integer.toString(value)).build();
+    }
+
+    private static DeletionTime deletion(long timeMillis)
+    {
+        TimeUnit MILLIS = TimeUnit.MILLISECONDS;
+        return new DeletionTime(MILLIS.toMicros(timeMillis), 
Ints.checkedCast(MILLIS.toSeconds(timeMillis)));
+    }
+
+    /**
+     * Tests that the given update doesn't cause data resolver to attempt to 
repair a transient replica
+     */
+    private void assertNoTransientRepairs(PartitionUpdate update)
+    {
+        SinglePartitionReadCommand command = 
SinglePartitionReadCommand.fullPartitionRead(update.metadata(), nowInSec, key);
+        EndpointsForToken targetReplicas = 
EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
+        TestableReadRepair repair = new TestableReadRepair(command);
+        DataResolver resolver = new DataResolver(command, plan(targetReplicas, 
ConsistencyLevel.QUORUM), repair, 0);
+
+        Assert.assertFalse(resolver.isDataPresent());
+        resolver.preprocess(response(command, EP1, iter(update), false));
+        resolver.preprocess(response(command, EP2, iter(update), false));
+        resolver.preprocess(response(command, EP3, 
EmptyIterators.unfilteredPartition(update.metadata()), false));
+
+        Assert.assertFalse(repair.dataWasConsumed());
+        assertPartitionsEqual(filter(iter(update)), resolver.resolve());
+        Assert.assertTrue(repair.dataWasConsumed());
+        Assert.assertTrue(repair.sent.toString(), repair.sent.isEmpty());
+    }
+
+    @Test
+    public void emptyRowRepair()
+    {
+        assertNoTransientRepairs(update(row(1000, 4, 4), row(1000, 5, 
5)).build());
+    }
+
+    @Test
+    public void emptyPartitionDeletionRepairs()
+    {
+        PartitionUpdate.Builder builder = update();
+        builder.addPartitionDeletion(deletion(1999));
+        assertNoTransientRepairs(builder.build());
+    }
+
+    /**
+     * Partition level deletion responses shouldn't sent data to a transient 
replica
+     */
+    @Test
+    public void emptyRowDeletionRepairs()
+    {
+        PartitionUpdate.Builder builder = update();
+        builder.add(rowBuilder(1).timestamp(1999).delete().build());
+        assertNoTransientRepairs(builder.build());
+    }
+
+    @Test
+    public void emptyComplexDeletionRepair()
+    {
+
+        long[] ts = {1000, 2000};
+
+        Row.Builder builder = BTreeRow.unsortedBuilder();
+        builder.newRow(Clustering.EMPTY);
+        builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
+        assertNoTransientRepairs(update(cfm2, "key", builder.build()).build());
+
+    }
+
+    @Test
+    public void emptyRangeTombstoneRepairs()
+    {
+        Slice slice = Slice.make(Clustering.make(ByteBufferUtil.bytes("a")), 
Clustering.make(ByteBufferUtil.bytes("b")));
+        PartitionUpdate.Builder builder = update();
+        builder.add(new RangeTombstone(slice, deletion(2000)));
+        assertNoTransientRepairs(builder.build());
+    }
+
+    /**
+     * If the full replicas need to repair each other, repairs shouldn't be 
sent to transient replicas
+     */
+    @Test
+    public void fullRepairsIgnoreTransientReplicas()
+    {
+        SinglePartitionReadCommand command = 
SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5));
+        EndpointsForToken targetReplicas = 
EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
+        TestableReadRepair repair = new TestableReadRepair(command);
+        DataResolver resolver = new DataResolver(command, plan(targetReplicas, 
QUORUM), repair, 0);
+
+        Assert.assertFalse(resolver.isDataPresent());
+        resolver.preprocess(response(command, EP1, iter(update(row(1000, 5, 
5)).build()), false));
+        resolver.preprocess(response(command, EP2, iter(update(row(2000, 4, 
4)).build()), false));
+        resolver.preprocess(response(command, EP3, 
EmptyIterators.unfilteredPartition(cfm), false));
+
+        Assert.assertFalse(repair.dataWasConsumed());
+
+        consume(resolver.resolve());
+
+        Assert.assertTrue(repair.dataWasConsumed());
+
+        Assert.assertTrue(repair.sent.containsKey(EP1));
+        Assert.assertTrue(repair.sent.containsKey(EP2));
+        Assert.assertFalse(repair.sent.containsKey(EP3));
+    }
+
+    /**
+     * If the transient replica has new data, the full replicas shoould be 
repaired, the transient one should not
+     */
+    @Test
+    public void transientMismatchesRepairFullReplicas()
+    {
+        SinglePartitionReadCommand command = 
SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5));
+        EndpointsForToken targetReplicas = 
EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
+        TestableReadRepair<?, ?> repair = new TestableReadRepair(command);
+        DataResolver resolver = new DataResolver(command, plan(targetReplicas, 
QUORUM), repair, 0);
+
+        Assert.assertFalse(resolver.isDataPresent());
+        PartitionUpdate transData = update(row(1000, 5, 5)).build();
+        resolver.preprocess(response(command, EP1, 
EmptyIterators.unfilteredPartition(cfm), false));
+        resolver.preprocess(response(command, EP2, 
EmptyIterators.unfilteredPartition(cfm), false));
+        resolver.preprocess(response(command, EP3, iter(transData), false));
+
+        Assert.assertFalse(repair.dataWasConsumed());
+
+        assertPartitionsEqual(filter(iter(transData)), resolver.resolve());
+
+        Assert.assertTrue(repair.dataWasConsumed());
+
+        assertPartitionsEqual(filter(iter(transData)), 
filter(iter(repair.sent.get(EP1).getPartitionUpdate(cfm))));
+        assertPartitionsEqual(filter(iter(transData)), 
filter(iter(repair.sent.get(EP2).getPartitionUpdate(cfm))));
+        Assert.assertFalse(repair.sent.containsKey(EP3));
+
+    }
+
+    private ReplicaPlan.SharedForTokenRead plan(EndpointsForToken replicas, 
ConsistencyLevel consistencyLevel)
+    {
+        return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, 
consistencyLevel, replicas, replicas));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java 
b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
index 8454d6a..99101f1 100644
--- a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.service.reads;
 
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -158,8 +159,8 @@ public class DigestResolverTest extends 
AbstractReadResponseTest
                               resolver.getData());
     }
 
-    private ReplicaLayout.ForToken plan(ConsistencyLevel consistencyLevel, 
EndpointsForToken replicas)
+    private ReplicaPlan.SharedForTokenRead plan(ConsistencyLevel 
consistencyLevel, EndpointsForToken replicas)
     {
-        return new ReplicaLayout.ForToken(ks, consistencyLevel, 
replicas.token(), replicas, null, replicas);
+        return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, 
consistencyLevel, replicas, replicas));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java 
b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
index 3b102f2..34be5ee 100644
--- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -224,13 +225,13 @@ public class ReadExecutorTest
 
     }
 
-    private ReplicaLayout.ForToken plan(EndpointsForToken targets, 
ConsistencyLevel consistencyLevel)
+    private ReplicaPlan.ForTokenRead plan(EndpointsForToken targets, 
ConsistencyLevel consistencyLevel)
     {
         return plan(consistencyLevel, targets, targets);
     }
 
-    private ReplicaLayout.ForToken plan(ConsistencyLevel consistencyLevel, 
EndpointsForToken natural, EndpointsForToken selected)
+    private ReplicaPlan.ForTokenRead plan(ConsistencyLevel consistencyLevel, 
EndpointsForToken natural, EndpointsForToken selected)
     {
-        return new ReplicaLayout.ForToken(ks, consistencyLevel, 
natural.token(), natural, null, selected);
+        return new ReplicaPlan.ForTokenRead(ks, consistencyLevel, natural, 
selected);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index 7e6ee29..5115581 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@ -10,6 +10,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -70,7 +71,7 @@ public abstract  class AbstractReadRepairTest
     static Replica replica2;
     static Replica replica3;
     static EndpointsForRange replicas;
-    static ReplicaLayout<?, ?> replicaLayout;
+    static ReplicaPlan.ForRead<?> replicaPlan;
 
     static long now = TimeUnit.NANOSECONDS.toMicros(System.nanoTime());
     static DecoratedKey key;
@@ -217,7 +218,7 @@ public abstract  class AbstractReadRepairTest
         replica3 = fullReplica(target3, FULL_RANGE);
         replicas = EndpointsForRange.of(replica1, replica2, replica3);
 
-        replicaLayout = replicaLayout(ConsistencyLevel.QUORUM, replicas);
+        replicaPlan = replicaPlan(ConsistencyLevel.QUORUM, replicas);
 
         // default test values
         key  = dk(5);
@@ -245,21 +246,30 @@ public abstract  class AbstractReadRepairTest
         cfs.transientWriteLatencyNanos = 0;
     }
 
-    static ReplicaLayout.ForRange replicaLayout(EndpointsForRange replicas, 
EndpointsForRange targets)
+    static ReplicaPlan.ForRangeRead replicaPlan(ConsistencyLevel 
consistencyLevel, EndpointsForRange replicas)
     {
-        return new ReplicaLayout.ForRange(ks, ConsistencyLevel.QUORUM, 
ReplicaUtils.FULL_BOUNDS, replicas, targets);
+        return replicaPlan(ks, consistencyLevel, replicas, replicas);
     }
 
-    static ReplicaLayout.ForRange replicaLayout(ConsistencyLevel 
consistencyLevel, EndpointsForRange replicas)
+    static ReplicaPlan.ForRangeRead replicaPlan(EndpointsForRange replicas, 
EndpointsForRange targets)
     {
-        return new ReplicaLayout.ForRange(ks, consistencyLevel, 
ReplicaUtils.FULL_BOUNDS, replicas, replicas);
+        return replicaPlan(ks, ConsistencyLevel.QUORUM, replicas, targets);
+    }
+    static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, EndpointsForRange replicas)
+    {
+        return replicaPlan(keyspace, consistencyLevel, replicas, replicas);
+    }
+    static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, EndpointsForRange replicas, 
EndpointsForRange targets)
+    {
+        return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel,
+                ReplicaUtils.FULL_BOUNDS, replicas, targets);
     }
 
-    public abstract InstrumentedReadRepair 
createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> 
replicaLayout, long queryStartNanoTime);
+    public abstract InstrumentedReadRepair 
createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?, ?> 
replicaPlan, long queryStartNanoTime);
 
-    public InstrumentedReadRepair 
createInstrumentedReadRepair(ReplicaLayout<?, ?> replicaLayout)
+    public InstrumentedReadRepair 
createInstrumentedReadRepair(ReplicaPlan.Shared<?, ?> replicaPlan)
     {
-        return createInstrumentedReadRepair(command, replicaLayout, 
System.nanoTime());
+        return createInstrumentedReadRepair(command, replicaPlan, 
System.nanoTime());
 
     }
 
@@ -270,7 +280,7 @@ public abstract  class AbstractReadRepairTest
     @Test
     public void readSpeculationCycle()
     {
-        InstrumentedReadRepair repair = 
createInstrumentedReadRepair(replicaLayout(replicas, 
EndpointsForRange.of(replica1, replica2)));
+        InstrumentedReadRepair repair = 
createInstrumentedReadRepair(ReplicaPlan.shared(replicaPlan(replicas, 
EndpointsForRange.of(replica1, replica2))));
         ResultConsumer consumer = new ResultConsumer();
 
         Assert.assertEquals(epSet(), repair.getReadRecipients());
@@ -289,7 +299,7 @@ public abstract  class AbstractReadRepairTest
     @Test
     public void noSpeculationRequired()
     {
-        InstrumentedReadRepair repair = 
createInstrumentedReadRepair(replicaLayout(replicas, 
EndpointsForRange.of(replica1, replica2)));
+        InstrumentedReadRepair repair = 
createInstrumentedReadRepair(ReplicaPlan.shared(replicaPlan(replicas, 
EndpointsForRange.of(replica1, replica2))));
         ResultConsumer consumer = new ResultConsumer();
 
         Assert.assertEquals(epSet(), repair.getReadRecipients());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index a4b7615..6bb1b7a 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -44,11 +45,12 @@ import org.apache.cassandra.service.reads.ReadCallback;
 
 public class BlockingReadRepairTest extends AbstractReadRepairTest
 {
-    private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, 
L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, L>
+    private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, 
P extends ReplicaPlan.ForRead<E>>
+            extends BlockingPartitionRepair<E, P>
     {
-        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, 
int maxBlockFor, L replicaLayout)
+        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, 
int maxBlockFor, P replicaPlan)
         {
-            super(Util.dk("not a real usable value"), repairs, maxBlockFor, 
replicaLayout);
+            super(Util.dk("not a real usable value"), repairs, maxBlockFor, 
replicaPlan);
         }
 
         Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -71,22 +73,24 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
         configureClass(ReadRepairStrategy.BLOCKING);
     }
 
-    private static InstrumentedReadRepairHandler 
createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, 
ReplicaLayout<?, ?> replicaLayout)
+    private static <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+    InstrumentedReadRepairHandler<E, P> createRepairHandler(Map<Replica, 
Mutation> repairs, int maxBlockFor, P replicaPlan)
     {
-        return new InstrumentedReadRepairHandler(repairs, maxBlockFor, 
replicaLayout);
+        return new InstrumentedReadRepairHandler<>(repairs, maxBlockFor, 
replicaPlan);
     }
 
     private static InstrumentedReadRepairHandler 
createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor)
     {
         EndpointsForRange replicas = 
EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()));
-        return createRepairHandler(repairs, maxBlockFor, 
replicaLayout(replicas, replicas));
+        return createRepairHandler(repairs, maxBlockFor, replicaPlan(replicas, 
replicas));
     }
 
-    private static class InstrumentedBlockingReadRepair<E extends 
Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingReadRepair<E, L> 
implements InstrumentedReadRepair<E, L>
+    private static class InstrumentedBlockingReadRepair<E extends 
Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+            extends BlockingReadRepair<E, P> implements 
InstrumentedReadRepair<E, P>
     {
-        public InstrumentedBlockingReadRepair(ReadCommand command, L 
replicaLayout, long queryStartNanoTime)
+        public InstrumentedBlockingReadRepair(ReadCommand command, 
ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
         {
-            super(command, replicaLayout, queryStartNanoTime);
+            super(command, replicaPlan, queryStartNanoTime);
         }
 
         Set<InetAddressAndPort> readCommandRecipients = new HashSet<>();
@@ -114,9 +118,9 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
     }
 
     @Override
-    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand 
command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime)
+    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand 
command, ReplicaPlan.Shared<?, ?> replicaPlan, long queryStartNanoTime)
     {
-        return new InstrumentedBlockingReadRepair(command, replicaLayout, 
queryStartNanoTime);
+        return new InstrumentedBlockingReadRepair(command, replicaPlan, 
queryStartNanoTime);
     }
 
     @Test
@@ -142,8 +146,8 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
         repairs.put(replica1, repair1);
         repairs.put(replica2, repair2);
 
-        ReplicaLayout.ForRange replicaLayout = replicaLayout(replicas, 
EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
-        InstrumentedReadRepairHandler<?, ?> handler = 
createRepairHandler(repairs, 2, replicaLayout);
+        ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(replicas, 
EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
+        InstrumentedReadRepairHandler<?, ?> handler = 
createRepairHandler(repairs, 2, replicaPlan);
 
         Assert.assertTrue(handler.mutationsSent.isEmpty());
 
@@ -221,7 +225,7 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
         repairs.put(replica1, repair1);
 
         // check that the correct initial mutations are sent out
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2, replicaLayout(replicas, EndpointsForRange.of(replica1, replica2)));
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2, replicaPlan(replicas, EndpointsForRange.of(replica1, replica2)));
         handler.sendInitialRepairs();
         Assert.assertEquals(1, handler.mutationsSent.size());
         Assert.assertTrue(handler.mutationsSent.containsKey(target1));
@@ -269,8 +273,8 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
         repairs.put(remote1, mutation(cell1));
 
         EndpointsForRange participants = EndpointsForRange.of(replica1, 
replica2, remote1, remote2);
-        ReplicaLayout.ForRange replicaLayout = new ReplicaLayout.ForRange(ks, 
ConsistencyLevel.LOCAL_QUORUM, ReplicaUtils.FULL_BOUNDS, participants, 
participants);
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2, replicaLayout);
+        ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(ks, 
ConsistencyLevel.LOCAL_QUORUM, participants);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2, replicaPlan);
         handler.sendInitialRepairs();
         Assert.assertEquals(2, handler.mutationsSent.size());
         
Assert.assertTrue(handler.mutationsSent.containsKey(replica1.endpoint()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
index a5efe27..c64a73b 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -80,8 +81,8 @@ public class DiagEventsBlockingReadRepairTest extends 
AbstractReadRepairTest
         repairs.put(replica2, repair2);
 
 
-        ReplicaLayout.ForRange replicaLayout = replicaLayout(replicas, 
EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
-        DiagnosticPartitionReadRepairHandler handler = 
createRepairHandler(repairs, 2, replicaLayout);
+        ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(replicas, 
EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
+        DiagnosticPartitionReadRepairHandler handler = 
createRepairHandler(repairs, 2, replicaPlan);
 
         Assert.assertTrue(handler.updatesByEp.isEmpty());
 
@@ -106,20 +107,20 @@ public class DiagEventsBlockingReadRepairTest extends 
AbstractReadRepairTest
         Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
     }
 
-    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand 
command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime)
+    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand 
command, ReplicaPlan.Shared<?,?> replicaPlan, long queryStartNanoTime)
     {
-        return new DiagnosticBlockingRepairHandler(command, replicaLayout, 
queryStartNanoTime);
+        return new DiagnosticBlockingRepairHandler(command, replicaPlan, 
queryStartNanoTime);
     }
 
-    private static DiagnosticPartitionReadRepairHandler 
createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, 
ReplicaLayout<?, ?> replicaLayout)
+    private static DiagnosticPartitionReadRepairHandler 
createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, 
ReplicaPlan.ForRead<?> replicaPlan)
     {
-        return new DiagnosticPartitionReadRepairHandler(key, repairs, 
maxBlockFor, replicaLayout);
+        return new DiagnosticPartitionReadRepairHandler<>(key, repairs, 
maxBlockFor, replicaPlan);
     }
 
     private static DiagnosticPartitionReadRepairHandler 
createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor)
     {
         EndpointsForRange replicas = 
EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()));
-        return createRepairHandler(repairs, maxBlockFor, 
replicaLayout(replicas, replicas));
+        return createRepairHandler(repairs, maxBlockFor, replicaPlan(replicas, 
replicas));
     }
 
     private static class DiagnosticBlockingRepairHandler extends 
BlockingReadRepair implements InstrumentedReadRepair
@@ -127,9 +128,9 @@ public class DiagEventsBlockingReadRepairTest extends 
AbstractReadRepairTest
         private Set<InetAddressAndPort> recipients = Collections.emptySet();
         private ReadCallback readCallback = null;
 
-        DiagnosticBlockingRepairHandler(ReadCommand command, ReplicaLayout<?, 
?> replicaLayout, long queryStartNanoTime)
+        DiagnosticBlockingRepairHandler(ReadCommand command, 
ReplicaPlan.Shared<?,?> replicaPlan, long queryStartNanoTime)
         {
-            super(command, replicaLayout, queryStartNanoTime);
+            super(command, replicaPlan, queryStartNanoTime);
             DiagnosticEventService.instance().subscribe(ReadRepairEvent.class, 
this::onRepairEvent);
         }
 
@@ -163,13 +164,14 @@ public class DiagEventsBlockingReadRepairTest extends 
AbstractReadRepairTest
         }
     }
 
-    private static class DiagnosticPartitionReadRepairHandler<E extends 
Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, 
L>
+    private static class DiagnosticPartitionReadRepairHandler<E extends 
Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+            extends BlockingPartitionRepair<E, P>
     {
         private final Map<InetAddressAndPort, String> updatesByEp = new 
HashMap<>();
 
-        DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, 
Mutation> repairs, int maxBlockFor, L replicaLayout)
+        DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, 
Mutation> repairs, int maxBlockFor, P replicaPlan)
         {
-            super(key, repairs, maxBlockFor, replicaLayout);
+            super(key, repairs, maxBlockFor, replicaPlan);
             
DiagnosticEventService.instance().subscribe(PartitionRepairEvent.class, 
this::onRepairEvent);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
index f3d2866..81ab07e 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
@@ -23,9 +23,11 @@ import java.util.Set;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.service.reads.ReadCallback;
 
-public interface InstrumentedReadRepair<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>> extends ReadRepair<E, L>
+public interface InstrumentedReadRepair<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<E>>
+        extends ReadRepair<E, P>
 {
     Set<InetAddressAndPort> getReadRecipients();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
index bee5ddd..cf12265 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -36,11 +37,12 @@ import org.apache.cassandra.service.reads.ReadCallback;
 
 public class ReadOnlyReadRepairTest extends AbstractReadRepairTest
 {
-    private static class InstrumentedReadOnlyReadRepair<E extends 
Endpoints<E>, L extends ReplicaLayout<E, L>> extends ReadOnlyReadRepair 
implements InstrumentedReadRepair
+    private static class InstrumentedReadOnlyReadRepair<E extends 
Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+            extends ReadOnlyReadRepair implements InstrumentedReadRepair
     {
-        public InstrumentedReadOnlyReadRepair(ReadCommand command, L 
replicaLayout, long queryStartNanoTime)
+        public InstrumentedReadOnlyReadRepair(ReadCommand command, 
ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
         {
-            super(command, replicaLayout, queryStartNanoTime);
+            super(command, replicaPlan, queryStartNanoTime);
         }
 
         Set<InetAddressAndPort> readCommandRecipients = new HashSet<>();
@@ -74,24 +76,24 @@ public class ReadOnlyReadRepairTest extends 
AbstractReadRepairTest
     }
 
     @Override
-    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand 
command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime)
+    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand 
command, ReplicaPlan.Shared<?, ?> replicaPlan, long queryStartNanoTime)
     {
-        return new InstrumentedReadOnlyReadRepair(command, replicaLayout, 
queryStartNanoTime);
+        return new InstrumentedReadOnlyReadRepair(command, replicaPlan, 
queryStartNanoTime);
     }
 
     @Test
     public void getMergeListener()
     {
-        ReplicaLayout<?, ?> replicaLayout = replicaLayout(replicas, replicas);
-        InstrumentedReadRepair repair = 
createInstrumentedReadRepair(replicaLayout);
-        Assert.assertSame(UnfilteredPartitionIterators.MergeListener.NOOP, 
repair.getMergeListener(replicaLayout));
+        ReplicaPlan.SharedForRangeRead replicaPlan = 
ReplicaPlan.shared(replicaPlan(replicas, replicas));
+        InstrumentedReadRepair repair = 
createInstrumentedReadRepair(replicaPlan);
+        Assert.assertSame(UnfilteredPartitionIterators.MergeListener.NOOP, 
repair.getMergeListener(replicaPlan.get()));
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void repairPartitionFailure()
     {
-        ReplicaLayout<?, ?> replicaLayout = replicaLayout(replicas, replicas);
-        InstrumentedReadRepair repair = 
createInstrumentedReadRepair(replicaLayout);
-        repair.repairPartition(null, Collections.emptyMap(), replicaLayout);
+        ReplicaPlan.SharedForRangeRead replicaPlan = 
ReplicaPlan.shared(replicaPlan(replicas, replicas));
+        InstrumentedReadRepair repair = 
createInstrumentedReadRepair(replicaPlan);
+        repair.repairPartition(null, Collections.emptyMap(), 
replicaPlan.get());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java 
b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
index e4ba25d..b678b4d 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -70,11 +71,12 @@ public class ReadRepairTest
     static Replica target3;
     static EndpointsForRange targets;
 
-    private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, 
L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, L>
+    private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, 
P extends ReplicaPlan.ForRead<E>>
+            extends BlockingPartitionRepair<E, P>
     {
-        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, 
int maxBlockFor, L replicaLayout)
+        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, 
int maxBlockFor, P replicaPlan)
         {
-            super(Util.dk("not a valid key"), repairs, maxBlockFor, 
replicaLayout);
+            super(Util.dk("not a valid key"), repairs, maxBlockFor, 
replicaPlan);
         }
 
         Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -166,8 +168,8 @@ public class ReadRepairTest
 
     private static InstrumentedReadRepairHandler 
createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, 
EndpointsForRange all, EndpointsForRange targets)
     {
-        ReplicaLayout.ForRange replicaLayout = new ReplicaLayout.ForRange(ks, 
ConsistencyLevel.LOCAL_QUORUM, ReplicaUtils.FULL_BOUNDS, all, targets);
-        return new InstrumentedReadRepairHandler(repairs, maxBlockFor, 
replicaLayout);
+        ReplicaPlan.ForRangeRead replicaPlan = 
AbstractReadRepairTest.replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, all, 
targets);
+        return new InstrumentedReadRepairHandler<>(repairs, maxBlockFor, 
replicaPlan);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java 
b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
index 2a2dec2..53964cb 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
@@ -36,28 +36,28 @@ import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.service.reads.DigestResolver;
 
-public class TestableReadRepair<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>> implements ReadRepair<E, L>
+public class TestableReadRepair<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<E>>
+        implements ReadRepair<E, P>
 {
     public final Map<InetAddressAndPort, Mutation> sent = new HashMap<>();
 
     private final ReadCommand command;
-    private final ConsistencyLevel consistency;
 
     private boolean partitionListenerClosed = false;
     private boolean rowListenerClosed = true;
 
-    public TestableReadRepair(ReadCommand command, ConsistencyLevel 
consistency)
+    public TestableReadRepair(ReadCommand command)
     {
         this.command = command;
-        this.consistency = consistency;
     }
 
     @Override
-    public UnfilteredPartitionIterators.MergeListener getMergeListener(L 
endpoints)
+    public UnfilteredPartitionIterators.MergeListener getMergeListener(P 
endpoints)
     {
-        return new PartitionIteratorMergeListener(endpoints, command, 
consistency, this) {
+        return new PartitionIteratorMergeListener<E>(endpoints, command, this) 
{
             @Override
             public void close()
             {
@@ -70,7 +70,7 @@ public class TestableReadRepair<E extends Endpoints<E>, L 
extends ReplicaLayout<
             {
                 assert rowListenerClosed;
                 rowListenerClosed = false;
-                return new RowIteratorMergeListener(partitionKey, 
columns(versions), isReversed(versions), endpoints, command, consistency, 
TestableReadRepair.this) {
+                return new RowIteratorMergeListener<E>(partitionKey, 
columns(versions), isReversed(versions), endpoints, command, 
TestableReadRepair.this) {
                     @Override
                     public void close()
                     {
@@ -83,7 +83,7 @@ public class TestableReadRepair<E extends Endpoints<E>, L 
extends ReplicaLayout<
     }
 
     @Override
-    public void startRepair(DigestResolver<E, L> digestResolver, 
Consumer<PartitionIterator> resultConsumer)
+    public void startRepair(DigestResolver<E, P> digestResolver, 
Consumer<PartitionIterator> resultConsumer)
     {
 
     }
@@ -113,7 +113,7 @@ public class TestableReadRepair<E extends Endpoints<E>, L 
extends ReplicaLayout<
     }
 
     @Override
-    public void repairPartition(DecoratedKey partitionKey, Map<Replica, 
Mutation> mutations, L replicaLayout)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, 
Mutation> mutations, P replicaPlan)
     {
         for (Map.Entry<Replica, Mutation> entry: mutations.entrySet())
             sent.put(entry.getKey().endpoint(), entry.getValue());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to