Repository: cassandra Updated Branches: refs/heads/trunk 05dbb3e0a -> 047bcd7ad
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java index cf1e06a..a3f13c2 100644 --- a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java +++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java @@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit; import com.google.common.base.Predicates; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.locator.EndpointsForToken; -import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaPlans; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -236,7 +236,7 @@ public class WriteResponseHandlerTest private static AbstractWriteResponseHandler createWriteResponseHandler(ConsistencyLevel cl, ConsistencyLevel ideal, long queryStartTime) { - return ks.getReplicationStrategy().getWriteResponseHandler(ReplicaLayout.forWriteWithDownNodes(ks, cl, targets.token(), targets, pending), + return ks.getReplicationStrategy().getWriteResponseHandler(ReplicaPlans.forWrite(ks, cl, targets, pending, Predicates.alwaysTrue(), ReplicaPlans.writeAll), null, WriteType.SIMPLE, queryStartTime, ideal); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java index c19e65e..b6c95dd 100644 --- a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java +++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java @@ -24,7 +24,6 @@ import java.util.Set; import java.util.UUID; import java.util.function.Predicate; -import com.google.common.base.Predicates; import com.google.common.collect.Sets; import org.apache.cassandra.dht.Murmur3Partitioner; @@ -32,10 +31,10 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.EndpointsForToken; import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.locator.EndpointsForRange; -import org.apache.cassandra.locator.ReplicaUtils; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlans; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.apache.cassandra.SchemaLoader; @@ -54,7 +53,6 @@ import org.apache.cassandra.utils.ByteBufferUtil; import static org.apache.cassandra.locator.Replica.fullReplica; import static org.apache.cassandra.locator.Replica.transientReplica; -import static org.apache.cassandra.locator.ReplicaUtils.FULL_RANGE; import static org.apache.cassandra.locator.ReplicaUtils.full; import static org.apache.cassandra.locator.ReplicaUtils.trans; @@ -147,32 +145,35 @@ public class WriteResponseHandlerTransientTest dummy = DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(0)); } - @Ignore("Throws unavailable for quorum as written") @Test public void checkPendingReplicasAreNotFiltered() { - EndpointsForToken natural = EndpointsForToken.of(dummy.getToken(), full(EP1), full(EP2), trans(EP3)); - EndpointsForToken pending = EndpointsForToken.of(dummy.getToken(), full(EP4), full(EP5), trans(EP6)); - ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(ks, ConsistencyLevel.QUORUM, dummy.getToken(), 2, natural, pending, Predicates.alwaysTrue()); + EndpointsForToken natural = EndpointsForToken.of(dummy.getToken(), full(EP1), full(EP2), trans(EP3), full(EP5)); + EndpointsForToken pending = EndpointsForToken.of(dummy.getToken(), full(EP4), trans(EP6)); + ReplicaLayout.ForTokenWrite layout = new ReplicaLayout.ForTokenWrite(natural, pending); + ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(ks, ConsistencyLevel.QUORUM, layout, layout, ReplicaPlans.writeAll); - Assert.assertEquals(EndpointsForRange.of(full(EP4), full(EP5), trans(EP6)), replicaLayout.pending()); + Assert.assertEquals(EndpointsForRange.of(full(EP4), trans(EP6)), replicaPlan.pending()); } - private static ReplicaLayout.ForToken expected(EndpointsForToken all, EndpointsForToken selected) + private static ReplicaPlan.ForTokenWrite expected(EndpointsForToken natural, EndpointsForToken selected) { - return new ReplicaLayout.ForToken(ks, ConsistencyLevel.QUORUM, dummy.getToken(), all, EndpointsForToken.empty(dummy.getToken()), selected); + return new ReplicaPlan.ForTokenWrite(ks, ConsistencyLevel.QUORUM, EndpointsForToken.empty(dummy.getToken()), natural, natural, selected); } - private static ReplicaLayout.ForToken getSpeculationContext(EndpointsForToken replicas, int blockFor, Predicate<InetAddressAndPort> livePredicate) + private static ReplicaPlan.ForTokenWrite getSpeculationContext(EndpointsForToken natural, Predicate<InetAddressAndPort> livePredicate) { - return ReplicaLayout.forWrite(ks, ConsistencyLevel.QUORUM, dummy.getToken(), blockFor, replicas, EndpointsForToken.empty(dummy.getToken()), livePredicate); + ReplicaLayout.ForTokenWrite liveAndDown = new ReplicaLayout.ForTokenWrite(natural, EndpointsForToken.empty(dummy.getToken())); + ReplicaLayout.ForTokenWrite live = new ReplicaLayout.ForTokenWrite(natural.filter(r -> livePredicate.test(r.endpoint())), EndpointsForToken.empty(dummy.getToken())); + return ReplicaPlans.forWrite(ks, ConsistencyLevel.QUORUM, liveAndDown, live, ReplicaPlans.writeNormal); } - private static void assertSpeculationReplicas(ReplicaLayout.ForToken expected, EndpointsForToken replicas, int blockFor, Predicate<InetAddressAndPort> livePredicate) + private static void assertSpeculationReplicas(ReplicaPlan.ForTokenWrite expected, EndpointsForToken replicas, Predicate<InetAddressAndPort> livePredicate) { - ReplicaLayout.ForToken actual = getSpeculationContext(replicas, blockFor, livePredicate); - Assert.assertEquals(expected.natural(), actual.natural()); - Assert.assertEquals(expected.selected(), actual.selected()); + ReplicaPlan.ForTokenWrite actual = getSpeculationContext(replicas, livePredicate); + Assert.assertEquals(expected.pending(), actual.pending()); + Assert.assertEquals(expected.live(), actual.live()); + Assert.assertEquals(expected.contacts(), actual.contacts()); } private static Predicate<InetAddressAndPort> dead(InetAddressAndPort... endpoints) @@ -186,39 +187,35 @@ public class WriteResponseHandlerTransientTest return EndpointsForToken.of(dummy.getToken(), rr); } - @Ignore("Throws unavailable for quorum as written") @Test public void checkSpeculationContext() { - EndpointsForToken all = replicas(full(EP1), full(EP2), trans(EP3)); + EndpointsForToken all = replicas(full(EP1), full(EP2), trans(EP3), full(EP4), full(EP5), trans(EP6)); // in happy path, transient replica should be classified as a backup - assertSpeculationReplicas(expected(all, - replicas(full(EP1), full(EP2))), - replicas(full(EP1), full(EP2), trans(EP3)), - 2, dead()); - - // if one of the full replicas is dead, they should all be in the initial contacts - assertSpeculationReplicas(expected(all, - replicas(full(EP1), trans(EP3))), - replicas(full(EP1), full(EP2), trans(EP3)), - 2, dead(EP2)); - - // block only for 1 full replica, use transient as backups - assertSpeculationReplicas(expected(all, - replicas(full(EP1))), - replicas(full(EP1), full(EP2), trans(EP3)), - 1, dead(EP2)); + assertSpeculationReplicas(expected(all, replicas(full(EP1), full(EP2), full(EP4), full(EP5))), + all, + dead()); + + // full replicas must always be in the contact list, and will occur first + assertSpeculationReplicas(expected(replicas(full(EP1), trans(EP3), full(EP4), trans(EP6)), replicas(full(EP1), full(EP2), full(EP4), full(EP5), trans(EP3), trans(EP6))), + all, + dead(EP2, EP5)); + + // only one transient used as backup + assertSpeculationReplicas(expected(replicas(full(EP1), trans(EP3), full(EP4), full(EP5), trans(EP6)), replicas(full(EP1), full(EP2), full(EP4), full(EP5), trans(EP3))), + all, + dead(EP2)); } @Test (expected = UnavailableException.class) public void noFullReplicas() { - getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), 2, dead(EP1)); + getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), dead(EP1)); } @Test (expected = UnavailableException.class) public void notEnoughTransientReplicas() { - getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), 2, dead(EP2, EP3)); + getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), dead(EP2, EP3)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java index 968ef16..c49bf3a 100644 --- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java +++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java @@ -24,6 +24,7 @@ import java.util.Iterator; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; +import org.apache.cassandra.locator.ReplicaPlan; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -87,7 +88,7 @@ public class DataResolverTest extends AbstractReadResponseTest { command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build(); command.trackRepairedStatus(); - readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM); + readRepair = new TestableReadRepair(command); } private static EndpointsForRange makeReplicas(int num) @@ -338,7 +339,7 @@ public class DataResolverTest extends AbstractReadResponseTest public void testResolveWithBothEmpty() { EndpointsForRange replicas = makeReplicas(2); - TestableReadRepair readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM); + TestableReadRepair readRepair = new TestableReadRepair(command); DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); resolver.preprocess(response(command, replicas.get(0).endpoint(), EmptyIterators.unfilteredPartition(cfm))); resolver.preprocess(response(command, replicas.get(1).endpoint(), EmptyIterators.unfilteredPartition(cfm))); @@ -715,7 +716,7 @@ public class DataResolverTest extends AbstractReadResponseTest { EndpointsForRange replicas = makeReplicas(2); ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); - TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM); + TestableReadRepair readRepair = new TestableReadRepair(cmd); DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); long[] ts = {100, 200}; @@ -767,7 +768,7 @@ public class DataResolverTest extends AbstractReadResponseTest { EndpointsForRange replicas = makeReplicas(2); ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); - TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM); + TestableReadRepair readRepair = new TestableReadRepair(cmd); DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); long[] ts = {100, 200}; @@ -811,7 +812,7 @@ public class DataResolverTest extends AbstractReadResponseTest { EndpointsForRange replicas = makeReplicas(2); ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); - TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM); + TestableReadRepair readRepair = new TestableReadRepair(cmd); DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); long[] ts = {100, 200}; @@ -861,7 +862,7 @@ public class DataResolverTest extends AbstractReadResponseTest { EndpointsForRange replicas = makeReplicas(2); ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); - TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM); + TestableReadRepair readRepair = new TestableReadRepair(cmd); DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime()); long[] ts = {100, 200}; @@ -1224,7 +1225,7 @@ public class DataResolverTest extends AbstractReadResponseTest } private DataResolver resolverWithVerifier(final ReadCommand command, - final ReplicaLayout.ForRange plan, + final ReplicaPlan.SharedForRangeRead plan, final ReadRepair readRepair, final long queryStartNanoTime, final RepairedDataVerifier verifier) @@ -1232,7 +1233,7 @@ public class DataResolverTest extends AbstractReadResponseTest class TestableDataResolver extends DataResolver { - public TestableDataResolver(ReadCommand command, ReplicaLayout.ForRange plan, ReadRepair readRepair, long queryStartNanoTime) + public TestableDataResolver(ReadCommand command, ReplicaPlan.SharedForRangeRead plan, ReadRepair readRepair, long queryStartNanoTime) { super(command, plan, readRepair, queryStartNanoTime); } @@ -1298,9 +1299,9 @@ public class DataResolverTest extends AbstractReadResponseTest assertEquals(update.metadata().name, cfm.name); } - private ReplicaLayout.ForRange plan(EndpointsForRange replicas, ConsistencyLevel consistencyLevel) + private ReplicaPlan.SharedForRangeRead plan(EndpointsForRange replicas, ConsistencyLevel consistencyLevel) { - return new ReplicaLayout.ForRange(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas); + return ReplicaPlan.shared(new ReplicaPlan.ForRangeRead(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas)); } private static void resolveAndConsume(DataResolver resolver) http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java new file mode 100644 index 0000000..456cec4 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads; + +import java.util.concurrent.TimeUnit; + +import com.google.common.primitives.Ints; + +import org.apache.cassandra.Util; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.locator.ReplicaPlan; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.EmptyIterators; +import org.apache.cassandra.db.RangeTombstone; +import org.apache.cassandra.db.SimpleBuilders; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.BTreeRow; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.reads.repair.TestableReadRepair; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.apache.cassandra.db.ConsistencyLevel.QUORUM; +import static org.apache.cassandra.locator.Replica.fullReplica; +import static org.apache.cassandra.locator.Replica.transientReplica; +import static org.apache.cassandra.locator.ReplicaUtils.full; +import static org.apache.cassandra.locator.ReplicaUtils.trans; + +/** + * Tests DataResolvers handing of transient replicas + */ +public class DataResolverTransientTest extends AbstractReadResponseTest +{ + private static DecoratedKey key; + + @Before + public void setUp() + { + key = Util.dk("key1"); + } + + private static PartitionUpdate.Builder update(TableMetadata metadata, String key, Row... rows) + { + PartitionUpdate.Builder builder = new PartitionUpdate.Builder(metadata, dk(key), metadata.regularAndStaticColumns(), rows.length, false); + for (Row row: rows) + { + builder.add(row); + } + return builder; + } + + private static PartitionUpdate.Builder update(Row... rows) + { + return update(cfm, "key1", rows); + } + + private static Row.SimpleBuilder rowBuilder(int clustering) + { + return new SimpleBuilders.RowBuilder(cfm, Integer.toString(clustering)); + } + + private static Row row(long timestamp, int clustering, int value) + { + return rowBuilder(clustering).timestamp(timestamp).add("c1", Integer.toString(value)).build(); + } + + private static DeletionTime deletion(long timeMillis) + { + TimeUnit MILLIS = TimeUnit.MILLISECONDS; + return new DeletionTime(MILLIS.toMicros(timeMillis), Ints.checkedCast(MILLIS.toSeconds(timeMillis))); + } + + /** + * Tests that the given update doesn't cause data resolver to attempt to repair a transient replica + */ + private void assertNoTransientRepairs(PartitionUpdate update) + { + SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(update.metadata(), nowInSec, key); + EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3)); + TestableReadRepair repair = new TestableReadRepair(command); + DataResolver resolver = new DataResolver(command, plan(targetReplicas, ConsistencyLevel.QUORUM), repair, 0); + + Assert.assertFalse(resolver.isDataPresent()); + resolver.preprocess(response(command, EP1, iter(update), false)); + resolver.preprocess(response(command, EP2, iter(update), false)); + resolver.preprocess(response(command, EP3, EmptyIterators.unfilteredPartition(update.metadata()), false)); + + Assert.assertFalse(repair.dataWasConsumed()); + assertPartitionsEqual(filter(iter(update)), resolver.resolve()); + Assert.assertTrue(repair.dataWasConsumed()); + Assert.assertTrue(repair.sent.toString(), repair.sent.isEmpty()); + } + + @Test + public void emptyRowRepair() + { + assertNoTransientRepairs(update(row(1000, 4, 4), row(1000, 5, 5)).build()); + } + + @Test + public void emptyPartitionDeletionRepairs() + { + PartitionUpdate.Builder builder = update(); + builder.addPartitionDeletion(deletion(1999)); + assertNoTransientRepairs(builder.build()); + } + + /** + * Partition level deletion responses shouldn't sent data to a transient replica + */ + @Test + public void emptyRowDeletionRepairs() + { + PartitionUpdate.Builder builder = update(); + builder.add(rowBuilder(1).timestamp(1999).delete().build()); + assertNoTransientRepairs(builder.build()); + } + + @Test + public void emptyComplexDeletionRepair() + { + + long[] ts = {1000, 2000}; + + Row.Builder builder = BTreeRow.unsortedBuilder(); + builder.newRow(Clustering.EMPTY); + builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec)); + assertNoTransientRepairs(update(cfm2, "key", builder.build()).build()); + + } + + @Test + public void emptyRangeTombstoneRepairs() + { + Slice slice = Slice.make(Clustering.make(ByteBufferUtil.bytes("a")), Clustering.make(ByteBufferUtil.bytes("b"))); + PartitionUpdate.Builder builder = update(); + builder.add(new RangeTombstone(slice, deletion(2000))); + assertNoTransientRepairs(builder.build()); + } + + /** + * If the full replicas need to repair each other, repairs shouldn't be sent to transient replicas + */ + @Test + public void fullRepairsIgnoreTransientReplicas() + { + SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5)); + EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3)); + TestableReadRepair repair = new TestableReadRepair(command); + DataResolver resolver = new DataResolver(command, plan(targetReplicas, QUORUM), repair, 0); + + Assert.assertFalse(resolver.isDataPresent()); + resolver.preprocess(response(command, EP1, iter(update(row(1000, 5, 5)).build()), false)); + resolver.preprocess(response(command, EP2, iter(update(row(2000, 4, 4)).build()), false)); + resolver.preprocess(response(command, EP3, EmptyIterators.unfilteredPartition(cfm), false)); + + Assert.assertFalse(repair.dataWasConsumed()); + + consume(resolver.resolve()); + + Assert.assertTrue(repair.dataWasConsumed()); + + Assert.assertTrue(repair.sent.containsKey(EP1)); + Assert.assertTrue(repair.sent.containsKey(EP2)); + Assert.assertFalse(repair.sent.containsKey(EP3)); + } + + /** + * If the transient replica has new data, the full replicas shoould be repaired, the transient one should not + */ + @Test + public void transientMismatchesRepairFullReplicas() + { + SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5)); + EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3)); + TestableReadRepair<?, ?> repair = new TestableReadRepair(command); + DataResolver resolver = new DataResolver(command, plan(targetReplicas, QUORUM), repair, 0); + + Assert.assertFalse(resolver.isDataPresent()); + PartitionUpdate transData = update(row(1000, 5, 5)).build(); + resolver.preprocess(response(command, EP1, EmptyIterators.unfilteredPartition(cfm), false)); + resolver.preprocess(response(command, EP2, EmptyIterators.unfilteredPartition(cfm), false)); + resolver.preprocess(response(command, EP3, iter(transData), false)); + + Assert.assertFalse(repair.dataWasConsumed()); + + assertPartitionsEqual(filter(iter(transData)), resolver.resolve()); + + Assert.assertTrue(repair.dataWasConsumed()); + + assertPartitionsEqual(filter(iter(transData)), filter(iter(repair.sent.get(EP1).getPartitionUpdate(cfm)))); + assertPartitionsEqual(filter(iter(transData)), filter(iter(repair.sent.get(EP2).getPartitionUpdate(cfm)))); + Assert.assertFalse(repair.sent.containsKey(EP3)); + + } + + private ReplicaPlan.SharedForTokenRead plan(EndpointsForToken replicas, ConsistencyLevel consistencyLevel) + { + return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, consistencyLevel, replicas, replicas)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java index 8454d6a..99101f1 100644 --- a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java +++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java @@ -18,6 +18,7 @@ package org.apache.cassandra.service.reads; +import org.apache.cassandra.locator.ReplicaPlan; import org.junit.Assert; import org.junit.Test; @@ -158,8 +159,8 @@ public class DigestResolverTest extends AbstractReadResponseTest resolver.getData()); } - private ReplicaLayout.ForToken plan(ConsistencyLevel consistencyLevel, EndpointsForToken replicas) + private ReplicaPlan.SharedForTokenRead plan(ConsistencyLevel consistencyLevel, EndpointsForToken replicas) { - return new ReplicaLayout.ForToken(ks, consistencyLevel, replicas.token(), replicas, null, replicas); + return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, consistencyLevel, replicas, replicas)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java index 3b102f2..34be5ee 100644 --- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java +++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java @@ -24,6 +24,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.ReplicaPlan; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -224,13 +225,13 @@ public class ReadExecutorTest } - private ReplicaLayout.ForToken plan(EndpointsForToken targets, ConsistencyLevel consistencyLevel) + private ReplicaPlan.ForTokenRead plan(EndpointsForToken targets, ConsistencyLevel consistencyLevel) { return plan(consistencyLevel, targets, targets); } - private ReplicaLayout.ForToken plan(ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken selected) + private ReplicaPlan.ForTokenRead plan(ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken selected) { - return new ReplicaLayout.ForToken(ks, consistencyLevel, natural.token(), natural, null, selected); + return new ReplicaPlan.ForTokenRead(ks, consistencyLevel, natural, selected); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java index 7e6ee29..5115581 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java @@ -10,6 +10,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; +import org.apache.cassandra.locator.ReplicaPlan; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; @@ -70,7 +71,7 @@ public abstract class AbstractReadRepairTest static Replica replica2; static Replica replica3; static EndpointsForRange replicas; - static ReplicaLayout<?, ?> replicaLayout; + static ReplicaPlan.ForRead<?> replicaPlan; static long now = TimeUnit.NANOSECONDS.toMicros(System.nanoTime()); static DecoratedKey key; @@ -217,7 +218,7 @@ public abstract class AbstractReadRepairTest replica3 = fullReplica(target3, FULL_RANGE); replicas = EndpointsForRange.of(replica1, replica2, replica3); - replicaLayout = replicaLayout(ConsistencyLevel.QUORUM, replicas); + replicaPlan = replicaPlan(ConsistencyLevel.QUORUM, replicas); // default test values key = dk(5); @@ -245,21 +246,30 @@ public abstract class AbstractReadRepairTest cfs.transientWriteLatencyNanos = 0; } - static ReplicaLayout.ForRange replicaLayout(EndpointsForRange replicas, EndpointsForRange targets) + static ReplicaPlan.ForRangeRead replicaPlan(ConsistencyLevel consistencyLevel, EndpointsForRange replicas) { - return new ReplicaLayout.ForRange(ks, ConsistencyLevel.QUORUM, ReplicaUtils.FULL_BOUNDS, replicas, targets); + return replicaPlan(ks, consistencyLevel, replicas, replicas); } - static ReplicaLayout.ForRange replicaLayout(ConsistencyLevel consistencyLevel, EndpointsForRange replicas) + static ReplicaPlan.ForRangeRead replicaPlan(EndpointsForRange replicas, EndpointsForRange targets) { - return new ReplicaLayout.ForRange(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas); + return replicaPlan(ks, ConsistencyLevel.QUORUM, replicas, targets); + } + static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForRange replicas) + { + return replicaPlan(keyspace, consistencyLevel, replicas, replicas); + } + static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForRange replicas, EndpointsForRange targets) + { + return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, + ReplicaUtils.FULL_BOUNDS, replicas, targets); } - public abstract InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime); + public abstract InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?, ?> replicaPlan, long queryStartNanoTime); - public InstrumentedReadRepair createInstrumentedReadRepair(ReplicaLayout<?, ?> replicaLayout) + public InstrumentedReadRepair createInstrumentedReadRepair(ReplicaPlan.Shared<?, ?> replicaPlan) { - return createInstrumentedReadRepair(command, replicaLayout, System.nanoTime()); + return createInstrumentedReadRepair(command, replicaPlan, System.nanoTime()); } @@ -270,7 +280,7 @@ public abstract class AbstractReadRepairTest @Test public void readSpeculationCycle() { - InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaLayout(replicas, EndpointsForRange.of(replica1, replica2))); + InstrumentedReadRepair repair = createInstrumentedReadRepair(ReplicaPlan.shared(replicaPlan(replicas, EndpointsForRange.of(replica1, replica2)))); ResultConsumer consumer = new ResultConsumer(); Assert.assertEquals(epSet(), repair.getReadRecipients()); @@ -289,7 +299,7 @@ public abstract class AbstractReadRepairTest @Test public void noSpeculationRequired() { - InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaLayout(replicas, EndpointsForRange.of(replica1, replica2))); + InstrumentedReadRepair repair = createInstrumentedReadRepair(ReplicaPlan.shared(replicaPlan(replicas, EndpointsForRange.of(replica1, replica2)))); ResultConsumer consumer = new ResultConsumer(); Assert.assertEquals(epSet(), repair.getReadRecipients()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java index a4b7615..6bb1b7a 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; +import org.apache.cassandra.locator.ReplicaPlan; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -44,11 +45,12 @@ import org.apache.cassandra.service.reads.ReadCallback; public class BlockingReadRepairTest extends AbstractReadRepairTest { - private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, L> + private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + extends BlockingPartitionRepair<E, P> { - public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout) + public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan) { - super(Util.dk("not a real usable value"), repairs, maxBlockFor, replicaLayout); + super(Util.dk("not a real usable value"), repairs, maxBlockFor, replicaPlan); } Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>(); @@ -71,22 +73,24 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest configureClass(ReadRepairStrategy.BLOCKING); } - private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, ReplicaLayout<?, ?> replicaLayout) + private static <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + InstrumentedReadRepairHandler<E, P> createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan) { - return new InstrumentedReadRepairHandler(repairs, maxBlockFor, replicaLayout); + return new InstrumentedReadRepairHandler<>(repairs, maxBlockFor, replicaPlan); } private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor) { EndpointsForRange replicas = EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())); - return createRepairHandler(repairs, maxBlockFor, replicaLayout(replicas, replicas)); + return createRepairHandler(repairs, maxBlockFor, replicaPlan(replicas, replicas)); } - private static class InstrumentedBlockingReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingReadRepair<E, L> implements InstrumentedReadRepair<E, L> + private static class InstrumentedBlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + extends BlockingReadRepair<E, P> implements InstrumentedReadRepair<E, P> { - public InstrumentedBlockingReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime) + public InstrumentedBlockingReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime) { - super(command, replicaLayout, queryStartNanoTime); + super(command, replicaPlan, queryStartNanoTime); } Set<InetAddressAndPort> readCommandRecipients = new HashSet<>(); @@ -114,9 +118,9 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest } @Override - public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime) + public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?, ?> replicaPlan, long queryStartNanoTime) { - return new InstrumentedBlockingReadRepair(command, replicaLayout, queryStartNanoTime); + return new InstrumentedBlockingReadRepair(command, replicaPlan, queryStartNanoTime); } @Test @@ -142,8 +146,8 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest repairs.put(replica1, repair1); repairs.put(replica2, repair2); - ReplicaLayout.ForRange replicaLayout = replicaLayout(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()))); - InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, 2, replicaLayout); + ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()))); + InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, 2, replicaPlan); Assert.assertTrue(handler.mutationsSent.isEmpty()); @@ -221,7 +225,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest repairs.put(replica1, repair1); // check that the correct initial mutations are sent out - InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaLayout(replicas, EndpointsForRange.of(replica1, replica2))); + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan(replicas, EndpointsForRange.of(replica1, replica2))); handler.sendInitialRepairs(); Assert.assertEquals(1, handler.mutationsSent.size()); Assert.assertTrue(handler.mutationsSent.containsKey(target1)); @@ -269,8 +273,8 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest repairs.put(remote1, mutation(cell1)); EndpointsForRange participants = EndpointsForRange.of(replica1, replica2, remote1, remote2); - ReplicaLayout.ForRange replicaLayout = new ReplicaLayout.ForRange(ks, ConsistencyLevel.LOCAL_QUORUM, ReplicaUtils.FULL_BOUNDS, participants, participants); - InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaLayout); + ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, participants); + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan); handler.sendInitialRepairs(); Assert.assertEquals(2, handler.mutationsSent.size()); Assert.assertTrue(handler.mutationsSent.containsKey(replica1.endpoint())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java index a5efe27..c64a73b 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; +import org.apache.cassandra.locator.ReplicaPlan; import org.junit.After; import org.junit.Assert; import org.junit.BeforeClass; @@ -80,8 +81,8 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest repairs.put(replica2, repair2); - ReplicaLayout.ForRange replicaLayout = replicaLayout(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()))); - DiagnosticPartitionReadRepairHandler handler = createRepairHandler(repairs, 2, replicaLayout); + ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()))); + DiagnosticPartitionReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan); Assert.assertTrue(handler.updatesByEp.isEmpty()); @@ -106,20 +107,20 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); } - public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime) + public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?,?> replicaPlan, long queryStartNanoTime) { - return new DiagnosticBlockingRepairHandler(command, replicaLayout, queryStartNanoTime); + return new DiagnosticBlockingRepairHandler(command, replicaPlan, queryStartNanoTime); } - private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, ReplicaLayout<?, ?> replicaLayout) + private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, ReplicaPlan.ForRead<?> replicaPlan) { - return new DiagnosticPartitionReadRepairHandler(key, repairs, maxBlockFor, replicaLayout); + return new DiagnosticPartitionReadRepairHandler<>(key, repairs, maxBlockFor, replicaPlan); } private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor) { EndpointsForRange replicas = EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())); - return createRepairHandler(repairs, maxBlockFor, replicaLayout(replicas, replicas)); + return createRepairHandler(repairs, maxBlockFor, replicaPlan(replicas, replicas)); } private static class DiagnosticBlockingRepairHandler extends BlockingReadRepair implements InstrumentedReadRepair @@ -127,9 +128,9 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest private Set<InetAddressAndPort> recipients = Collections.emptySet(); private ReadCallback readCallback = null; - DiagnosticBlockingRepairHandler(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime) + DiagnosticBlockingRepairHandler(ReadCommand command, ReplicaPlan.Shared<?,?> replicaPlan, long queryStartNanoTime) { - super(command, replicaLayout, queryStartNanoTime); + super(command, replicaPlan, queryStartNanoTime); DiagnosticEventService.instance().subscribe(ReadRepairEvent.class, this::onRepairEvent); } @@ -163,13 +164,14 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest } } - private static class DiagnosticPartitionReadRepairHandler<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, L> + private static class DiagnosticPartitionReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + extends BlockingPartitionRepair<E, P> { private final Map<InetAddressAndPort, String> updatesByEp = new HashMap<>(); - DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout) + DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan) { - super(key, repairs, maxBlockFor, replicaLayout); + super(key, repairs, maxBlockFor, replicaPlan); DiagnosticEventService.instance().subscribe(PartitionRepairEvent.class, this::onRepairEvent); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java index f3d2866..81ab07e 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java @@ -23,9 +23,11 @@ import java.util.Set; import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.service.reads.ReadCallback; -public interface InstrumentedReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ReadRepair<E, L> +public interface InstrumentedReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + extends ReadRepair<E, P> { Set<InetAddressAndPort> getReadRecipients(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java index bee5ddd..cf12265 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import org.apache.cassandra.locator.ReplicaPlan; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -36,11 +37,12 @@ import org.apache.cassandra.service.reads.ReadCallback; public class ReadOnlyReadRepairTest extends AbstractReadRepairTest { - private static class InstrumentedReadOnlyReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ReadOnlyReadRepair implements InstrumentedReadRepair + private static class InstrumentedReadOnlyReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + extends ReadOnlyReadRepair implements InstrumentedReadRepair { - public InstrumentedReadOnlyReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime) + public InstrumentedReadOnlyReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime) { - super(command, replicaLayout, queryStartNanoTime); + super(command, replicaPlan, queryStartNanoTime); } Set<InetAddressAndPort> readCommandRecipients = new HashSet<>(); @@ -74,24 +76,24 @@ public class ReadOnlyReadRepairTest extends AbstractReadRepairTest } @Override - public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime) + public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?, ?> replicaPlan, long queryStartNanoTime) { - return new InstrumentedReadOnlyReadRepair(command, replicaLayout, queryStartNanoTime); + return new InstrumentedReadOnlyReadRepair(command, replicaPlan, queryStartNanoTime); } @Test public void getMergeListener() { - ReplicaLayout<?, ?> replicaLayout = replicaLayout(replicas, replicas); - InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaLayout); - Assert.assertSame(UnfilteredPartitionIterators.MergeListener.NOOP, repair.getMergeListener(replicaLayout)); + ReplicaPlan.SharedForRangeRead replicaPlan = ReplicaPlan.shared(replicaPlan(replicas, replicas)); + InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaPlan); + Assert.assertSame(UnfilteredPartitionIterators.MergeListener.NOOP, repair.getMergeListener(replicaPlan.get())); } @Test(expected = UnsupportedOperationException.class) public void repairPartitionFailure() { - ReplicaLayout<?, ?> replicaLayout = replicaLayout(replicas, replicas); - InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaLayout); - repair.repairPartition(null, Collections.emptyMap(), replicaLayout); + ReplicaPlan.SharedForRangeRead replicaPlan = ReplicaPlan.shared(replicaPlan(replicas, replicas)); + InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaPlan); + repair.repairPartition(null, Collections.emptyMap(), replicaPlan.get()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java index e4ba25d..b678b4d 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java @@ -27,6 +27,7 @@ import com.google.common.collect.Iterables; import org.apache.cassandra.Util; import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.ReplicaPlan; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -70,11 +71,12 @@ public class ReadRepairTest static Replica target3; static EndpointsForRange targets; - private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, L> + private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + extends BlockingPartitionRepair<E, P> { - public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout) + public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan) { - super(Util.dk("not a valid key"), repairs, maxBlockFor, replicaLayout); + super(Util.dk("not a valid key"), repairs, maxBlockFor, replicaPlan); } Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>(); @@ -166,8 +168,8 @@ public class ReadRepairTest private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, EndpointsForRange all, EndpointsForRange targets) { - ReplicaLayout.ForRange replicaLayout = new ReplicaLayout.ForRange(ks, ConsistencyLevel.LOCAL_QUORUM, ReplicaUtils.FULL_BOUNDS, all, targets); - return new InstrumentedReadRepairHandler(repairs, maxBlockFor, replicaLayout); + ReplicaPlan.ForRangeRead replicaPlan = AbstractReadRepairTest.replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, all, targets); + return new InstrumentedReadRepairHandler<>(repairs, maxBlockFor, replicaPlan); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java index 2a2dec2..53964cb 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java @@ -36,28 +36,28 @@ import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.service.reads.DigestResolver; -public class TestableReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements ReadRepair<E, L> +public class TestableReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + implements ReadRepair<E, P> { public final Map<InetAddressAndPort, Mutation> sent = new HashMap<>(); private final ReadCommand command; - private final ConsistencyLevel consistency; private boolean partitionListenerClosed = false; private boolean rowListenerClosed = true; - public TestableReadRepair(ReadCommand command, ConsistencyLevel consistency) + public TestableReadRepair(ReadCommand command) { this.command = command; - this.consistency = consistency; } @Override - public UnfilteredPartitionIterators.MergeListener getMergeListener(L endpoints) + public UnfilteredPartitionIterators.MergeListener getMergeListener(P endpoints) { - return new PartitionIteratorMergeListener(endpoints, command, consistency, this) { + return new PartitionIteratorMergeListener<E>(endpoints, command, this) { @Override public void close() { @@ -70,7 +70,7 @@ public class TestableReadRepair<E extends Endpoints<E>, L extends ReplicaLayout< { assert rowListenerClosed; rowListenerClosed = false; - return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), endpoints, command, consistency, TestableReadRepair.this) { + return new RowIteratorMergeListener<E>(partitionKey, columns(versions), isReversed(versions), endpoints, command, TestableReadRepair.this) { @Override public void close() { @@ -83,7 +83,7 @@ public class TestableReadRepair<E extends Endpoints<E>, L extends ReplicaLayout< } @Override - public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer) + public void startRepair(DigestResolver<E, P> digestResolver, Consumer<PartitionIterator> resultConsumer) { } @@ -113,7 +113,7 @@ public class TestableReadRepair<E extends Endpoints<E>, L extends ReplicaLayout< } @Override - public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout) + public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan) { for (Map.Entry<Replica, Mutation> entry: mutations.entrySet()) sent.put(entry.getKey().endpoint(), entry.getValue()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org