This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 0f22dab1a015cb84d9857f940de5a256bfbee083 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Thu Jul 18 14:46:06 2019 +0100 Restore monotonic read consistency guarantees patch by Benedict; reviewed by Sam Tunnicliffe for CASSANDRA-14740 --- CHANGES.txt | 1 + .../org/apache/cassandra/db/ConsistencyLevel.java | 2 +- .../cassandra/locator/ReplicaCollection.java | 5 + .../org/apache/cassandra/locator/ReplicaPlan.java | 13 ++- .../org/apache/cassandra/locator/ReplicaPlans.java | 69 +++++++++++- .../cassandra/service/reads/DigestResolver.java | 8 +- .../cassandra/service/reads/ResponseResolver.java | 2 +- .../reads/repair/BlockingPartitionRepair.java | 38 ++++--- .../service/reads/repair/BlockingReadRepair.java | 24 ++-- .../service/reads/repair/NoopReadRepair.java | 2 +- .../service/reads/repair/ReadOnlyReadRepair.java | 2 +- .../cassandra/service/reads/repair/ReadRepair.java | 2 +- .../reads/repair/RowIteratorMergeListener.java | 123 ++++++++++++++------ .../distributed/impl/AbstractCluster.java | 20 ++++ .../distributed/test/DistributedTestBase.java | 7 +- .../cassandra/distributed/test/ReadRepairTest.java | 125 +++++++++++++++++++++ ...WritePathTest.java => SimpleReadWriteTest.java} | 4 +- .../service/reads/AbstractReadResponseTest.java | 19 +++- .../cassandra/service/reads/DataResolverTest.java | 55 ++++++--- .../reads/repair/AbstractReadRepairTest.java | 36 ++++++ .../reads/repair/BlockingReadRepairTest.java | 34 +++--- .../repair/DiagEventsBlockingReadRepairTest.java | 21 ++-- .../reads/repair/ReadOnlyReadRepairTest.java | 7 +- .../service/reads/repair/ReadRepairTest.java | 30 +++-- .../service/reads/repair/TestableReadRepair.java | 2 +- 25 files changed, 495 insertions(+), 156 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 75fae01..6efa148 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-alpha3 + * Restore monotonic read consistency guarantees for blocking read repair (CASSANDRA-14740) * Separate exceptions for CAS write timeout exceptions caused by contention and unkown result (CASSANDRA-15350) * Fix in-jvm dtest java 11 compatibility (CASSANDRA-15463) * Remove joda time dependency (CASSANDRA-15257) diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java index 4973915..e685618 100644 --- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java +++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java @@ -101,7 +101,7 @@ public enum ConsistencyLevel public static ObjectIntOpenHashMap<String> eachQuorumForRead(Keyspace keyspace) { NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); - ObjectIntOpenHashMap<String> perDc = new ObjectIntOpenHashMap<>(strategy.getDatacenters().size()); + ObjectIntOpenHashMap<String> perDc = new ObjectIntOpenHashMap<>(((strategy.getDatacenters().size() + 1) * 4) / 3); for (String dc : strategy.getDatacenters()) perDc.put(dc, ConsistencyLevel.localQuorumFor(keyspace, dc)); return perDc; diff --git a/src/java/org/apache/cassandra/locator/ReplicaCollection.java b/src/java/org/apache/cassandra/locator/ReplicaCollection.java index d870316..ec671d5 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaCollection.java +++ b/src/java/org/apache/cassandra/locator/ReplicaCollection.java @@ -125,6 +125,11 @@ public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Itera public C build(); /** + * @return an Immutable clone that assumes this Builder will be modified again + */ + public C snapshot(); + + /** * Passed to add() and addAll() as ignoreConflicts parameter. The meaning of conflict varies by collection type * (for Endpoints, it is a duplicate InetAddressAndPort; for RangesAtEndpoint it is a duplicate Range). */ diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlan.java b/src/java/org/apache/cassandra/locator/ReplicaPlan.java index 861c912..16af58a 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlan.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java @@ -52,6 +52,8 @@ public abstract class ReplicaPlan<E extends Endpoints<E>> public abstract int blockFor(); public E contacts() { return contacts; } + + // TODO: should this semantically return true if we contain the endpoint, not the exact replica? public boolean contacts(Replica replica) { return contacts.contains(replica); } public Keyspace keyspace() { return keyspace; } public ConsistencyLevel consistencyLevel() { return consistencyLevel; } @@ -72,17 +74,12 @@ public abstract class ReplicaPlan<E extends Endpoints<E>> public E candidates() { return candidates; } - public E uncontactedCandidates() - { - return candidates().filter(r -> !contacts(r)); - } - public Replica firstUncontactedCandidate(Predicate<Replica> extraPredicate) { return Iterables.tryFind(candidates(), r -> extraPredicate.test(r) && !contacts(r)).orNull(); } - public Replica getReplicaFor(InetAddressAndPort endpoint) + public Replica lookup(InetAddressAndPort endpoint) { return candidates().byEndpoint().get(endpoint); } @@ -151,6 +148,10 @@ public abstract class ReplicaPlan<E extends Endpoints<E>> public E liveUncontacted() { return live().filter(r -> !contacts(r)); } /** Test liveness, consistent with the upfront analysis done for this operation (i.e. test membership of live()) */ public boolean isAlive(Replica replica) { return live.endpoints().contains(replica.endpoint()); } + public Replica lookup(InetAddressAndPort endpoint) + { + return liveAndDown().byEndpoint().get(endpoint); + } public String toString() { diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java index a6fe53f..236706a 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java @@ -320,6 +320,11 @@ public class ReplicaPlans return result; } + public static ReplicaPlan.ForTokenWrite forReadRepair(Token token, ReplicaPlan.ForRead<?> readPlan) throws UnavailableException + { + return forWrite(readPlan.keyspace, readPlan.consistencyLevel, token, writeReadRepair(readPlan)); + } + public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException { return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token), selector); @@ -345,7 +350,7 @@ public class ReplicaPlans public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException { - EndpointsForToken contacts = selector.select(keyspace, liveAndDown, live); + EndpointsForToken contacts = selector.select(keyspace, consistencyLevel, liveAndDown, live); assureSufficientLiveReplicasForWrite(keyspace, consistencyLevel, live.all(), liveAndDown.pending()); return new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts); } @@ -353,7 +358,7 @@ public class ReplicaPlans public interface Selector { <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>> - E select(Keyspace keyspace, L liveAndDown, L live); + E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live); } /** @@ -366,7 +371,7 @@ public class ReplicaPlans { @Override public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>> - E select(Keyspace keyspace, L liveAndDown, L live) + E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live) { return liveAndDown.all(); } @@ -385,7 +390,7 @@ public class ReplicaPlans { @Override public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>> - E select(Keyspace keyspace, L liveAndDown, L live) + E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live) { if (!any(liveAndDown.all(), Replica::isTransient)) return liveAndDown.all(); @@ -417,6 +422,62 @@ public class ReplicaPlans }; /** + * TODO: Transient Replication C-14404/C-14665 + * TODO: We employ this even when there is no monotonicity to guarantee, + * e.g. in case of CL.TWO, CL.ONE with speculation, etc. + * + * Construct a read-repair write plan to provide monotonicity guarantees on any data we return as part of a read. + * + * Since this is not a regular write, this is just to guarantee future reads will read this data, we select only + * the minimal number of nodes to meet the consistency level, and prefer nodes we contacted on read to minimise + * data transfer. + */ + public static Selector writeReadRepair(ReplicaPlan.ForRead<?> readPlan) + { + return new Selector() + { + @Override + public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>> + E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live) + { + assert !any(liveAndDown.all(), Replica::isTransient); + + ReplicaCollection.Builder<E> contacts = live.all().newBuilder(live.all().size()); + // add all live nodes we might write to that we have already contacted on read + contacts.addAll(filter(live.all(), r -> readPlan.contacts().endpoints().contains(r.endpoint()))); + + // finally, add sufficient nodes to achieve our consistency level + if (consistencyLevel != EACH_QUORUM) + { + int add = consistencyLevel.blockForWrite(keyspace, liveAndDown.pending()) - contacts.size(); + if (add > 0) + { + for (Replica replica : filter(live.all(), r -> !contacts.contains(r))) + { + contacts.add(replica); + if (--add == 0) + break; + } + } + } + else + { + ObjectIntOpenHashMap<String> requiredPerDc = eachQuorumForWrite(keyspace, liveAndDown.pending()); + addToCountPerDc(requiredPerDc, contacts.snapshot(), -1); + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + for (Replica replica : filter(live.all(), r -> !contacts.contains(r))) + { + String dc = snitch.getDatacenter(replica); + if (requiredPerDc.addTo(dc, -1) >= 0) + contacts.add(replica); + } + } + return contacts.build(); + } + }; + } + + /** * Construct the plan for a paxos round - NOT the write or read consistency level for either the write or comparison, * but for the paxos linearisation agreement. * diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java index cf7ec31..dbb761b 100644 --- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java +++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java @@ -54,7 +54,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea public void preprocess(Message<ReadResponse> message) { super.preprocess(message); - Replica replica = replicaPlan().getReplicaFor(message.from()); + Replica replica = replicaPlan().lookup(message.from()); if (dataResponse == null && !message.payload.isDigestResponse() && replica.isFull()) dataResponse = message; } @@ -69,7 +69,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea { return any(responses, msg -> !msg.payload.isDigestResponse() - && replicaPlan().getReplicaFor(msg.from()).isTransient()); + && replicaPlan().lookup(msg.from()).isTransient()); } public PartitionIterator getData() @@ -93,7 +93,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea // Reconcile with transient replicas for (Message<ReadResponse> response : responses) { - Replica replica = replicaPlan().getReplicaFor(response.from()); + Replica replica = replicaPlan().lookup(response.from()); if (replica.isTransient()) dataResolver.preprocess(response); } @@ -115,7 +115,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea // TODO: should also not calculate if only one full node for (Message<ReadResponse> message : snapshot) { - if (replicaPlan().getReplicaFor(message.from()).isTransient()) + if (replicaPlan().lookup(message.from()).isTransient()) continue; ByteBuffer newDigest = message.payload.digest(command); diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java index 8e15c1a..6ae19ac 100644 --- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java +++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java @@ -55,7 +55,7 @@ public abstract class ResponseResolver<E extends Endpoints<E>, P extends Replica public void preprocess(Message<ReadResponse> message) { - if (replicaPlan().getReplicaFor(message.from()).isTransient() && + if (replicaPlan().lookup(message.from()).isTransient() && message.payload.isDigestResponse()) throw new IllegalArgumentException("Digest response received from transient replica"); diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java index 01fd7f0..edcf14d 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java @@ -39,6 +39,7 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.EndpointsForToken; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.ReplicaPlan; @@ -53,33 +54,33 @@ import org.apache.cassandra.tracing.Tracing; import static org.apache.cassandra.net.Verb.*; -public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> +public class BlockingPartitionRepair extends AbstractFuture<Object> implements RequestCallback<Object> { private final DecoratedKey key; - private final P replicaPlan; + private final ReplicaPlan.ForTokenWrite writePlan; private final Map<Replica, Mutation> pendingRepairs; private final CountDownLatch latch; private final Predicate<InetAddressAndPort> shouldBlockOn; private volatile long mutationsSentTime; - public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan) + public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan) { - this(key, repairs, maxBlockFor, replicaPlan, - replicaPlan.consistencyLevel().isDatacenterLocal() ? InOurDcTester.endpoints() : Predicates.alwaysTrue()); + this(key, repairs, writePlan, + writePlan.consistencyLevel().isDatacenterLocal() ? InOurDcTester.endpoints() : Predicates.alwaysTrue()); } - public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan, Predicate<InetAddressAndPort> shouldBlockOn) + public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan, Predicate<InetAddressAndPort> shouldBlockOn) { this.key = key; this.pendingRepairs = new ConcurrentHashMap<>(repairs); - this.replicaPlan = replicaPlan; + this.writePlan = writePlan; this.shouldBlockOn = shouldBlockOn; + int blockFor = writePlan.blockFor(); // here we remove empty repair mutations from the block for total, since // we're not sending them mutations - int blockFor = maxBlockFor; - for (Replica participant: replicaPlan.contacts()) + for (Replica participant : writePlan.contacts()) { // remote dcs can sometimes get involved in dc-local reads. We want to repair // them if they do, but they shouldn't interfere with blocking the client read. @@ -95,10 +96,15 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl latch = new CountDownLatch(Math.max(blockFor, 0)); } + int blockFor() + { + return writePlan.blockFor(); + } + @VisibleForTesting - long waitingOn() + int waitingOn() { - return latch.getCount(); + return (int) latch.getCount(); } @VisibleForTesting @@ -106,7 +112,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl { if (shouldBlockOn.test(from)) { - pendingRepairs.remove(replicaPlan.getReplicaFor(from)); + pendingRepairs.remove(writePlan.lookup(from)); latch.countDown(); } } @@ -199,7 +205,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl if (awaitRepairsUntil(timeout + timeoutUnit.convert(mutationsSentTime, TimeUnit.NANOSECONDS), timeoutUnit)) return; - E newCandidates = replicaPlan.uncontactedCandidates(); + EndpointsForToken newCandidates = writePlan.liveUncontacted(); if (newCandidates.isEmpty()) return; @@ -221,7 +227,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl if (mutation == null) { - mutation = BlockingReadRepairs.createRepairMutation(update, replicaPlan.consistencyLevel(), replica.endpoint(), true); + mutation = BlockingReadRepairs.createRepairMutation(update, writePlan.consistencyLevel(), replica.endpoint(), true); versionedMutations[versionIdx] = mutation; } @@ -240,7 +246,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl Keyspace getKeyspace() { - return replicaPlan.keyspace(); + return writePlan.keyspace(); } DecoratedKey getKey() @@ -250,6 +256,6 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl ConsistencyLevel getConsistency() { - return replicaPlan.consistencyLevel(); + return writePlan.consistencyLevel(); } } diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java index 764765e..fdc8b50 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java @@ -39,7 +39,6 @@ import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.metrics.ReadRepairMetrics; import org.apache.cassandra.tracing.Tracing; -import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; /** @@ -53,12 +52,10 @@ public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepair.class); protected final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>(); - private final int blockFor; BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime) { super(command, replicaPlan, queryStartNanoTime); - this.blockFor = replicaPlan().consistencyLevel().blockFor(cfs.keyspace); } public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan) @@ -84,31 +81,34 @@ public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo @Override public void awaitWrites() { - boolean timedOut = false; - for (BlockingPartitionRepair repair: repairs) + BlockingPartitionRepair timedOut = null; + for (BlockingPartitionRepair repair : repairs) { if (!repair.awaitRepairsUntil(DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS) + queryStartNanoTime, NANOSECONDS)) { - timedOut = true; + timedOut = repair; + break; } } - if (timedOut) + if (timedOut != null) { - // We got all responses, but timed out while repairing - int blockFor = replicaPlan().blockFor(); + // We got all responses, but timed out while repairing; + // pick one of the repairs to throw, as this is better than completely manufacturing the error message + int blockFor = timedOut.blockFor(); + int received = Math.min(blockFor - timedOut.waitingOn(), blockFor - 1); if (Tracing.isTracing()) Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); else logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor); - throw new ReadTimeoutException(replicaPlan().consistencyLevel(), blockFor - 1, blockFor, true); + throw new ReadTimeoutException(replicaPlan().consistencyLevel(), received, blockFor, true); } } @Override - public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan) + public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan) { - BlockingPartitionRepair<E, P> blockingRepair = new BlockingPartitionRepair<>(partitionKey, mutations, blockFor, replicaPlan); + BlockingPartitionRepair blockingRepair = new BlockingPartitionRepair(partitionKey, mutations, writePlan); blockingRepair.sendInitialRepairs(); repairs.add(blockingRepair); } diff --git a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java index 6aa6ece..2f82c22 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java @@ -76,7 +76,7 @@ public class NoopReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRea } @Override - public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan) + public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan) { } diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java index 64bfec2..d9293fb 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java @@ -61,7 +61,7 @@ public class ReadOnlyReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo } @Override - public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan) + public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan) { throw new UnsupportedOperationException("ReadOnlyReadRepair shouldn't be trying to repair partitions"); } diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java index 9441945..4747651 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java @@ -93,5 +93,5 @@ public interface ReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRea * Repairs a partition _after_ receiving data responses. This method receives replica list, since * we will block repair only on the replicas that have responded. */ - void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan); + void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan); } diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java index 60e0d41..fc4c351 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java +++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java @@ -19,14 +19,17 @@ package org.apache.cassandra.service.reads.repair; import java.util.Arrays; +import java.util.BitSet; import java.util.Map; +import java.util.function.Consumer; -import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; +import com.carrotsearch.hppc.ObjectIntOpenHashMap; +import net.nicoulaj.compilecommand.annotations.Inline; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.ClusteringBound; -import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.LivenessInfo; @@ -45,8 +48,10 @@ import org.apache.cassandra.db.rows.RowDiffListener; import org.apache.cassandra.db.rows.Rows; import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlans; import org.apache.cassandra.schema.ColumnMetadata; public class RowIteratorMergeListener<E extends Endpoints<E>> @@ -57,10 +62,14 @@ public class RowIteratorMergeListener<E extends Endpoints<E>> private final boolean isReversed; private final ReadCommand command; + private final BitSet writeBackTo; + private final boolean buildFullDiff; + /** the repairs we will send to each source, suffixed by a complete repair of all differences, if {@link #buildFullDiff} */ private final PartitionUpdate.Builder[] repairs; private final Row.Builder[] currentRows; private final RowDiffListener diffListener; - private final ReplicaPlan.ForRead<E> replicaPlan; + private final ReplicaPlan.ForRead<E> readPlan; + private final ReplicaPlan.ForTokenWrite writePlan; // The partition level deletion for the merge row. private DeletionTime partitionLevelDeletion; @@ -73,17 +82,36 @@ public class RowIteratorMergeListener<E extends Endpoints<E>> private final ReadRepair readRepair; - public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaPlan.ForRead<E> replicaPlan, ReadCommand command, ReadRepair readRepair) + public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaPlan.ForRead<E> readPlan, ReadCommand command, ReadRepair readRepair) { this.partitionKey = partitionKey; this.columns = columns; this.isReversed = isReversed; - this.replicaPlan = replicaPlan; - int size = replicaPlan.contacts().size(); - repairs = new PartitionUpdate.Builder[size]; - currentRows = new Row.Builder[size]; - sourceDeletionTime = new DeletionTime[size]; - markerToRepair = new ClusteringBound[size]; + this.readPlan = readPlan; + this.writePlan = ReplicaPlans.forReadRepair(partitionKey.getToken(), readPlan); + + int size = readPlan.contacts().size(); + this.writeBackTo = new BitSet(size); + { + int i = 0; + for (Replica replica : readPlan.contacts()) + { + if (writePlan.contacts().endpoints().contains(replica.endpoint())) + writeBackTo.set(i); + ++i; + } + } + // If we are contacting any nodes we didn't read from, we are likely handling a range movement. + // In this case we need to send all differences to these nodes, as we do not (with present design) know which + // node they bootstrapped from, and so which data we need to duplicate. + // In reality, there will be situations where we are simply sending the same number of writes to different nodes + // and in this case we could probably avoid building a full difference, and only ensure each write makes it to + // some other node, but it is probably not worth special casing this scenario. + this.buildFullDiff = Iterables.any(writePlan.contacts().endpoints(), e -> !readPlan.contacts().endpoints().contains(e)); + this.repairs = new PartitionUpdate.Builder[size + (buildFullDiff ? 1 : 0)]; + this.currentRows = new Row.Builder[size]; + this.sourceDeletionTime = new DeletionTime[size]; + this.markerToRepair = new ClusteringBound[size]; this.command = command; this.readRepair = readRepair; @@ -128,13 +156,6 @@ public class RowIteratorMergeListener<E extends Endpoints<E>> }; } - private PartitionUpdate.Builder update(int i) - { - if (repairs[i] == null) - repairs[i] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1); - return repairs[i]; - } - /** * The partition level deletion with with which source {@code i} is currently repaired, or * {@code DeletionTime.LIVE} if the source is not repaired on the partition level deletion (meaning it was @@ -156,13 +177,30 @@ public class RowIteratorMergeListener<E extends Endpoints<E>> return currentRows[i]; } + @Inline + private void applyToPartition(int i, Consumer<PartitionUpdate.Builder> f) + { + if (writeBackTo.get(i)) + { + if (repairs[i] == null) + repairs[i] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1); + f.accept(repairs[i]); + } + if (buildFullDiff) + { + if (repairs[repairs.length - 1] == null) + repairs[repairs.length - 1] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1); + f.accept(repairs[repairs.length - 1]); + } + } + public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) { this.partitionLevelDeletion = mergedDeletion; for (int i = 0; i < versions.length; i++) { if (mergedDeletion.supersedes(versions[i])) - update(i).addPartitionDeletion(mergedDeletion); + applyToPartition(i, p -> p.addPartitionDeletion(mergedDeletion)); } } @@ -178,7 +216,10 @@ public class RowIteratorMergeListener<E extends Endpoints<E>> for (int i = 0; i < currentRows.length; i++) { if (currentRows[i] != null) - update(i).add(currentRows[i].build()); + { + Row row = currentRows[i].build(); + applyToPartition(i, p -> p.add(row)); + } } Arrays.fill(currentRows, null); } @@ -302,34 +343,44 @@ public class RowIteratorMergeListener<E extends Endpoints<E>> private void closeOpenMarker(int i, ClusteringBound close) { ClusteringBound open = markerToRepair[i]; - update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion())); + RangeTombstone rt = new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion()); + applyToPartition(i, p -> p.add(rt)); markerToRepair[i] = null; } public void close() { - Map<Replica, Mutation> mutations = null; - Endpoints<?> sources = replicaPlan.contacts(); - for (int i = 0; i < repairs.length; i++) - { - if (repairs[i] == null) - continue; + boolean hasRepairs = false; + for (int i = 0 ; !hasRepairs && i < repairs.length ; ++i) + hasRepairs = repairs[i] != null; + if (!hasRepairs) + return; + + PartitionUpdate fullDiffRepair = null; + if (buildFullDiff && repairs[repairs.length - 1] != null) + fullDiffRepair = repairs[repairs.length - 1].build(); - Replica source = sources.get(i); + Map<Replica, Mutation> mutations = Maps.newHashMapWithExpectedSize(writePlan.contacts().size()); + ObjectIntOpenHashMap<InetAddressAndPort> sourceIds = new ObjectIntOpenHashMap<>(((repairs.length + 1) * 4) / 3); + for (int i = 0 ; i < readPlan.contacts().size() ; ++i) + sourceIds.put(readPlan.contacts().get(i).endpoint(), 1 + i); - Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), replicaPlan.consistencyLevel(), source.endpoint(), false); + for (Replica replica : writePlan.contacts()) + { + PartitionUpdate update = null; + int i = -1 + sourceIds.get(replica.endpoint()); + if (i < 0) + update = fullDiffRepair; + else if (repairs[i] != null) + update = repairs[i].build(); + + Mutation mutation = BlockingReadRepairs.createRepairMutation(update, readPlan.consistencyLevel(), replica.endpoint(), false); if (mutation == null) continue; - if (mutations == null) - mutations = Maps.newHashMapWithExpectedSize(sources.size()); - - mutations.put(source, mutation); + mutations.put(replica, mutation); } - if (mutations != null) - { - readRepair.repairPartition(partitionKey, mutations, replicaPlan); - } + readRepair.repairPartition(partitionKey, mutations, writePlan); } } \ No newline at end of file diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java index 68cc6a2..d487c14 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java @@ -47,6 +47,8 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.distributed.api.ICoordinator; import org.apache.cassandra.distributed.api.IInstance; import org.apache.cassandra.distributed.api.IInstanceConfig; @@ -766,5 +768,23 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, } } + public List<Token> tokens() + { + return stream() + .map(i -> + { + try + { + IPartitioner partitioner = ((IPartitioner)Class.forName(i.config().getString("partitioner")).newInstance()); + return partitioner.getTokenFactory().fromString(i.config().getString("initial_token")); + } + catch (Throwable t) + { + throw new RuntimeException(t); + } + }) + .collect(Collectors.toList()); + } + } diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java index 64dee64..65224ea 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java @@ -78,7 +78,12 @@ public class DistributedTestBase protected static <C extends AbstractCluster<?>> C init(C cluster) { - cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + cluster.size() + "};"); + return init(cluster, Math.min(3, cluster.size())); + } + + protected static <C extends AbstractCluster<?>> C init(C cluster, int replicationFactor) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + replicationFactor + "};"); return cluster; } diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java new file mode 100644 index 0000000..a8c5fa1 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.util.List; + +import org.junit.Test; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.service.PendingRangeCalculatorService; +import org.apache.cassandra.service.StorageService; + +import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ; +import static org.apache.cassandra.net.Verb.READ_REQ; + +public class ReadRepairTest extends DistributedTestBase +{ + + @Test + public void readRepairTest() throws Throwable + { + try (Cluster cluster = init(Cluster.create(3))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'"); + + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + + assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1")); + + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.ALL), + row(1, 1, 1)); + + // Verify that data got repaired to the third node + assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"), + row(1, 1, 1)); + } + } + + @Test + public void failingReadRepairTest() throws Throwable + { + try (Cluster cluster = init(Cluster.create(3))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'"); + + for (int i = 1 ; i <= 2 ; ++i) + cluster.get(i).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + + assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1")); + + cluster.verbs(READ_REPAIR_REQ).to(3).drop(); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.QUORUM), + row(1, 1, 1)); + + // Data was not repaired + assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1")); + } + } + + @Test + public void movingTokenReadRepairTest() throws Throwable + { + try (Cluster cluster = init(Cluster.create(4), 3)) + { + List<Token> tokens = cluster.tokens(); + + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'"); + + int i = 0; + while (true) + { + Token t = Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(i)); + if (t.compareTo(tokens.get(2 - 1)) < 0 && t.compareTo(tokens.get(1 - 1)) > 0) + break; + ++i; + } + + // write only to #4 + cluster.get(4).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, 1, 1)", i); + // mark #2 as leaving in #4 + cluster.get(4).acceptsOnInstance((InetAddressAndPort endpoint) -> { + StorageService.instance.getTokenMetadata().addLeavingEndpoint(endpoint); + PendingRangeCalculatorService.instance.update(); + PendingRangeCalculatorService.instance.blockUntilFinished(); + }).accept(cluster.get(2).broadcastAddressAndPort()); + + // prevent #4 from reading or writing to #3, so our QUORUM must contain #2 and #4 + // since #1 is taking over the range, this means any read-repair must make it to #1 as well + cluster.filters().verbs(READ_REQ.ordinal()).from(4).to(3).drop(); + cluster.filters().verbs(READ_REPAIR_REQ.ordinal()).from(4).to(3).drop(); + assertRows(cluster.coordinator(4).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", + ConsistencyLevel.ALL, i), + row(i, 1, 1)); + + // verify that #1 receives the write + assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", i), + row(i, 1, 1)); + } + } + +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java similarity index 99% rename from test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java rename to test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java index 679a90f..4292598 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java @@ -18,8 +18,6 @@ package org.apache.cassandra.distributed.test; -import java.util.concurrent.TimeUnit; - import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -38,7 +36,7 @@ import static org.junit.Assert.assertEquals; import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ; import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD; -public class DistributedReadWritePathTest extends DistributedTestBase +public class SimpleReadWriteTest extends DistributedTestBase { @BeforeClass public static void before() diff --git a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java index 545731b..884baa1 100644 --- a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java +++ b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java @@ -82,14 +82,18 @@ import static org.apache.cassandra.net.Verb.READ_REQ; public abstract class AbstractReadResponseTest { public static final String KEYSPACE1 = "DataResolverTest"; + public static final String KEYSPACE3 = "DataResolverTest3"; public static final String CF_STANDARD = "Standard1"; public static final String CF_COLLECTION = "Collection1"; public static Keyspace ks; + public static Keyspace ks3; public static ColumnFamilyStore cfs; public static ColumnFamilyStore cfs2; + public static ColumnFamilyStore cfs3; public static TableMetadata cfm; public static TableMetadata cfm2; + public static TableMetadata cfm3; public static ColumnMetadata m; public static DecoratedKey dk; @@ -128,19 +132,32 @@ public abstract class AbstractReadResponseTest .addRegularColumn("one", AsciiType.instance) .addRegularColumn("two", AsciiType.instance); + TableMetadata.Builder builder3 = + TableMetadata.builder(KEYSPACE3, CF_STANDARD) + .addPartitionKeyColumn("key", BytesType.instance) + .addClusteringColumn("col1", AsciiType.instance) + .addRegularColumn("c1", AsciiType.instance) + .addRegularColumn("c2", AsciiType.instance) + .addRegularColumn("one", AsciiType.instance) + .addRegularColumn("two", AsciiType.instance); + TableMetadata.Builder builder2 = TableMetadata.builder(KEYSPACE1, CF_COLLECTION) .addPartitionKeyColumn("k", ByteType.instance) .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true)); SchemaLoader.prepareServer(); - SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), builder1, builder2); + SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(2), builder1, builder2); + SchemaLoader.createKeyspace(KEYSPACE3, KeyspaceParams.simple(4), builder3); ks = Keyspace.open(KEYSPACE1); cfs = ks.getColumnFamilyStore(CF_STANDARD); cfm = cfs.metadata(); cfs2 = ks.getColumnFamilyStore(CF_COLLECTION); cfm2 = cfs2.metadata(); + ks3 = Keyspace.open(KEYSPACE3); + cfs3 = ks3.getColumnFamilyStore(CF_STANDARD); + cfm3 = cfs3.metadata(); m = cfm2.getColumn(new ColumnIdentifier("m", false)); } diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java index 17c6e41..50ed09d 100644 --- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java +++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java @@ -21,9 +21,17 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Iterator; +import java.util.UUID; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.ReplicaPlan; import org.junit.Assert; import org.junit.Before; @@ -59,6 +67,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.ReplicaUtils; import org.apache.cassandra.net.*; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.reads.repair.ReadRepair; import org.apache.cassandra.service.reads.repair.RepairedDataTracker; import org.apache.cassandra.service.reads.repair.RepairedDataVerifier; @@ -76,28 +85,42 @@ import static org.junit.Assert.assertTrue; public class DataResolverTest extends AbstractReadResponseTest { - public static final String KEYSPACE1 = "DataResolverTest"; - public static final String CF_STANDARD = "Standard1"; - private ReadCommand command; private TestableReadRepair readRepair; + private Keyspace ks; + private ColumnFamilyStore cfs; - @Before - public void setup() + private EndpointsForRange makeReplicas(int num) { + StorageService.instance.getTokenMetadata().clearUnsafe(); + + switch (num) + { + case 2: + ks = AbstractReadResponseTest.ks; + cfs = AbstractReadResponseTest.cfs; + break; + case 4: + ks = AbstractReadResponseTest.ks3; + cfs = AbstractReadResponseTest.cfs3; + break; + default: + throw new IllegalStateException("This test needs refactoring to cleanly support different replication factors"); + } + command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build(); command.trackRepairedStatus(); readRepair = new TestableReadRepair(command); - } - - private static EndpointsForRange makeReplicas(int num) - { + Token token = Murmur3Partitioner.instance.getMinimumToken(); EndpointsForRange.Builder replicas = EndpointsForRange.builder(ReplicaUtils.FULL_RANGE, num); for (int i = 0; i < num; i++) { try { - replicas.add(ReplicaUtils.full(InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) (i + 1) }))); + InetAddressAndPort endpoint = InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) (i + 1) }); + replicas.add(ReplicaUtils.full(endpoint)); + StorageService.instance.getTokenMetadata().updateNormalToken(token = token.increaseSlightly(), endpoint); + Gossiper.instance.initializeNodeUnsafe(endpoint, UUID.randomUUID(), 1); } catch (UnknownHostException e) { @@ -231,30 +254,30 @@ public class DataResolverTest extends AbstractReadResponseTest RangeTombstone tombstone1 = tombstone("1", "11", 1, nowInSec); RangeTombstone tombstone2 = tombstone("3", "31", 1, nowInSec); - PartitionUpdate update = new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1) + PartitionUpdate update = new RowUpdateBuilder(cfm3, nowInSec, 1L, dk).addRangeTombstone(tombstone1) .addRangeTombstone(tombstone2) .buildUpdate(); InetAddressAndPort peer1 = replicas.get(0).endpoint(); - UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1) + UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm3, nowInSec, 1L, dk).addRangeTombstone(tombstone1) .addRangeTombstone(tombstone2) .buildUpdate()); resolver.preprocess(response(command, peer1, iter1)); // not covered by any range tombstone InetAddressAndPort peer2 = replicas.get(1).endpoint(); - UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("0") + UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm3, nowInSec, 0L, dk).clustering("0") .add("c1", "v0") .buildUpdate()); resolver.preprocess(response(command, peer2, iter2)); // covered by a range tombstone InetAddressAndPort peer3 = replicas.get(2).endpoint(); - UnfilteredPartitionIterator iter3 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("10") + UnfilteredPartitionIterator iter3 = iter(new RowUpdateBuilder(cfm3, nowInSec, 0L, dk).clustering("10") .add("c2", "v1") .buildUpdate()); resolver.preprocess(response(command, peer3, iter3)); // range covered by rt, but newer InetAddressAndPort peer4 = replicas.get(3).endpoint(); - UnfilteredPartitionIterator iter4 = iter(new RowUpdateBuilder(cfm, nowInSec, 2L, dk).clustering("3") + UnfilteredPartitionIterator iter4 = iter(new RowUpdateBuilder(cfm3, nowInSec, 2L, dk).clustering("3") .add("one", "A") .buildUpdate()); resolver.preprocess(response(command, peer4, iter4)); @@ -1294,7 +1317,7 @@ public class DataResolverTest extends AbstractReadResponseTest private void assertRepairMetadata(Mutation mutation) { PartitionUpdate update = mutation.getPartitionUpdates().iterator().next(); - assertEquals(update.metadata().keyspace, cfm.keyspace); + assertEquals(update.metadata().keyspace, ks.getName()); assertEquals(update.metadata().name, cfm.name); } diff --git a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java index 3d39732..ed4ef3f 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java @@ -1,14 +1,24 @@ package org.apache.cassandra.service.reads.repair; +import java.nio.ByteBuffer; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; + +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.locator.EndpointsForToken; import org.apache.cassandra.locator.ReplicaPlan; import org.junit.Assert; import org.junit.Before; @@ -41,6 +51,7 @@ import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaPlans; import org.apache.cassandra.locator.ReplicaUtils; import org.apache.cassandra.net.Message; import org.apache.cassandra.schema.KeyspaceMetadata; @@ -48,6 +59,7 @@ import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.MigrationManager; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Tables; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import static org.apache.cassandra.locator.Replica.fullReplica; @@ -216,6 +228,14 @@ public abstract class AbstractReadRepairTest replicaPlan = replicaPlan(ConsistencyLevel.QUORUM, replicas); + StorageService.instance.getTokenMetadata().clearUnsafe(); + StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 0 })), replica1.endpoint()); + StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 1 })), replica2.endpoint()); + StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 2 })), replica3.endpoint()); + Gossiper.instance.initializeNodeUnsafe(replica1.endpoint(), UUID.randomUUID(), 1); + Gossiper.instance.initializeNodeUnsafe(replica2.endpoint(), UUID.randomUUID(), 1); + Gossiper.instance.initializeNodeUnsafe(replica3.endpoint(), UUID.randomUUID(), 1); + // default test values key = dk(5); cell1 = cell("v", "val1", now); @@ -247,6 +267,22 @@ public abstract class AbstractReadRepairTest return replicaPlan(ks, consistencyLevel, replicas, replicas); } + static ReplicaPlan.ForTokenWrite repairPlan(ReplicaPlan.ForRangeRead readPlan) + { + return repairPlan(readPlan, readPlan.candidates()); + } + + static ReplicaPlan.ForTokenWrite repairPlan(EndpointsForRange liveAndDown, EndpointsForRange targets) + { + return repairPlan(replicaPlan(liveAndDown, targets), liveAndDown); + } + + static ReplicaPlan.ForTokenWrite repairPlan(ReplicaPlan.ForRangeRead readPlan, EndpointsForRange liveAndDown) + { + Token token = readPlan.range().left.getToken(); + EndpointsForToken pending = EndpointsForToken.empty(token); + return ReplicaPlans.forWrite(ks, ConsistencyLevel.TWO, liveAndDown.forToken(token), pending, Predicates.alwaysTrue(), ReplicaPlans.writeReadRepair(readPlan)); + } static ReplicaPlan.ForRangeRead replicaPlan(EndpointsForRange replicas, EndpointsForRange targets) { return replicaPlan(ks, ConsistencyLevel.QUORUM, replicas, targets); diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java index 3cc1a63..e4b3a71 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java @@ -44,13 +44,12 @@ import org.apache.cassandra.service.reads.ReadCallback; public class BlockingReadRepairTest extends AbstractReadRepairTest { - private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> - extends BlockingPartitionRepair<E, P> + private static class InstrumentedReadRepairHandler + extends BlockingPartitionRepair { - public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan) + public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan) { - super(Util.dk("not a real usable value"), repairs, maxBlockFor, replicaPlan, - e -> targets.contains(e)); + super(Util.dk("not a real usable value"), repairs, writePlan, e -> targets.contains(e)); } Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>(); @@ -67,16 +66,15 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest configureClass(ReadRepairStrategy.BLOCKING); } - private static <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> - InstrumentedReadRepairHandler<E, P> createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan) + private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan) { - return new InstrumentedReadRepairHandler<>(repairs, maxBlockFor, replicaPlan); + return new InstrumentedReadRepairHandler(repairs, writePlan); } - private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor) + private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs) { EndpointsForRange replicas = EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())); - return createRepairHandler(repairs, maxBlockFor, replicaPlan(replicas, replicas)); + return createRepairHandler(repairs, repairPlan(replicas, replicas)); } private static class InstrumentedBlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> @@ -140,8 +138,8 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest repairs.put(replica1, repair1); repairs.put(replica2, repair2); - ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()))); - InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, 2, replicaPlan); + ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()))); + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, writePlan); Assert.assertTrue(handler.mutationsSent.isEmpty()); @@ -176,7 +174,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest repairs.put(replica1, mutation(cell2)); repairs.put(replica2, mutation(cell1)); - InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2); + InstrumentedReadRepairHandler handler = createRepairHandler(repairs); handler.sendInitialRepairs(); handler.ack(target1); handler.ack(target2); @@ -197,7 +195,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest repairs.put(replica1, mutation(cell2)); repairs.put(replica2, mutation(cell1)); - InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2); + InstrumentedReadRepairHandler handler = createRepairHandler(repairs); handler.sendInitialRepairs(); // we've already sent mutations to all candidates, so we shouldn't send any more @@ -219,7 +217,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest repairs.put(replica1, repair1); // check that the correct initial mutations are sent out - InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan(replicas, EndpointsForRange.of(replica1, replica2))); + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, repairPlan(replicas, EndpointsForRange.of(replica1, replica2))); handler.sendInitialRepairs(); Assert.assertEquals(1, handler.mutationsSent.size()); Assert.assertTrue(handler.mutationsSent.containsKey(target1)); @@ -240,7 +238,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest repairs.put(replica3, mutation(cell3)); Assert.assertEquals(3, repairs.size()); - InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2); + InstrumentedReadRepairHandler handler = createRepairHandler(repairs); handler.sendInitialRepairs(); Assert.assertFalse(getCurrentRepairStatus(handler)); @@ -266,8 +264,8 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest repairs.put(remote1, mutation(cell1)); EndpointsForRange participants = EndpointsForRange.of(replica1, replica2, remote1, remote2); - ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, participants); - InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan); + ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, participants)); + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, writePlan); handler.sendInitialRepairs(); Assert.assertEquals(2, handler.mutationsSent.size()); Assert.assertTrue(handler.mutationsSent.containsKey(replica1.endpoint())); diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java index c15d7f4..7806a3f 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import com.google.common.collect.Lists; + import org.apache.cassandra.locator.ReplicaPlan; import org.junit.After; import org.junit.Assert; @@ -81,8 +82,8 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest repairs.put(replica2, repair2); - ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()))); - DiagnosticPartitionReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan); + ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()))); + DiagnosticPartitionReadRepairHandler handler = createRepairHandler(repairs, writePlan); Assert.assertTrue(handler.updatesByEp.isEmpty()); @@ -117,15 +118,9 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest return new DiagnosticBlockingRepairHandler(command, replicaPlan, queryStartNanoTime); } - private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, ReplicaPlan.ForRead<?> replicaPlan) - { - return new DiagnosticPartitionReadRepairHandler<>(key, repairs, maxBlockFor, replicaPlan); - } - - private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor) + private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan) { - EndpointsForRange replicas = EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())); - return createRepairHandler(repairs, maxBlockFor, replicaPlan(replicas, replicas)); + return new DiagnosticPartitionReadRepairHandler(key, repairs, writePlan); } private static class DiagnosticBlockingRepairHandler extends BlockingReadRepair implements InstrumentedReadRepair @@ -170,7 +165,7 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest } private static class DiagnosticPartitionReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> - extends BlockingPartitionRepair<E, P> + extends BlockingPartitionRepair { private final Map<InetAddressAndPort, String> updatesByEp = new HashMap<>(); @@ -180,9 +175,9 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest return e -> candidates.contains(e); } - DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan) + DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan) { - super(key, repairs, maxBlockFor, replicaPlan, isLocal()); + super(key, repairs, writePlan, isLocal()); DiagnosticEventService.instance().subscribe(PartitionRepairEvent.class, this::onRepairEvent); } diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java index cc7fdf1..c0af493 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java @@ -92,8 +92,9 @@ public class ReadOnlyReadRepairTest extends AbstractReadRepairTest @Test(expected = UnsupportedOperationException.class) public void repairPartitionFailure() { - ReplicaPlan.SharedForRangeRead replicaPlan = ReplicaPlan.shared(replicaPlan(replicas, replicas)); - InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaPlan); - repair.repairPartition(null, Collections.emptyMap(), replicaPlan.get()); + ReplicaPlan.SharedForRangeRead readPlan = ReplicaPlan.shared(replicaPlan(replicas, replicas)); + ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicas, replicas); + InstrumentedReadRepair repair = createInstrumentedReadRepair(readPlan); + repair.repairPartition(null, Collections.emptyMap(), writePlan); } } diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java index 5ae9dd8..7458e9b 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java @@ -49,8 +49,6 @@ import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; -import org.apache.cassandra.locator.ReplicaLayout; -import org.apache.cassandra.locator.ReplicaUtils; import org.apache.cassandra.net.Message; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; @@ -72,12 +70,11 @@ public class ReadRepairTest static EndpointsForRange targets; private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> - extends BlockingPartitionRepair<E, P> + extends BlockingPartitionRepair { - public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan) + public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan) { - super(Util.dk("not a valid key"), repairs, maxBlockFor, replicaPlan, - e -> replicaPlan.consistencyLevel().isDatacenterLocal() && targets.endpoints().contains(e)); + super(Util.dk("not a valid key"), repairs, writePlan, e -> targets.endpoints().contains(e)); } Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>(); @@ -161,10 +158,11 @@ public class ReadRepairTest return new Mutation(PartitionUpdate.singleRowUpdate(cfm, key, builder.build())); } - private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, EndpointsForRange all, EndpointsForRange targets) + private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, EndpointsForRange all, EndpointsForRange targets) { - ReplicaPlan.ForRangeRead replicaPlan = AbstractReadRepairTest.replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, all, targets); - return new InstrumentedReadRepairHandler<>(repairs, maxBlockFor, replicaPlan); + ReplicaPlan.ForRangeRead readPlan = AbstractReadRepairTest.replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, all, targets); + ReplicaPlan.ForTokenWrite writePlan = AbstractReadRepairTest.repairPlan(readPlan); + return new InstrumentedReadRepairHandler(repairs, writePlan); } @Test @@ -198,8 +196,7 @@ public class ReadRepairTest repairs.put(target1, repair1); repairs.put(target2, repair2); - InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, 2, - targets, EndpointsForRange.of(target1, target2)); + InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, targets, EndpointsForRange.of(target1, target2)); Assert.assertTrue(handler.mutationsSent.isEmpty()); @@ -234,7 +231,7 @@ public class ReadRepairTest repairs.put(target2, mutation(cell1)); EndpointsForRange replicas = EndpointsForRange.of(target1, target2); - InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicas, targets); + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, replicas, targets); handler.sendInitialRepairs(); handler.ack(target1.endpoint()); handler.ack(target2.endpoint()); @@ -255,7 +252,7 @@ public class ReadRepairTest repairs.put(target1, mutation(cell2)); repairs.put(target2, mutation(cell1)); - InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, EndpointsForRange.of(target1, target2), + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, EndpointsForRange.of(target1, target2), EndpointsForRange.of(target1, target2)); handler.sendInitialRepairs(); @@ -278,7 +275,7 @@ public class ReadRepairTest repairs.put(target1, repair1); // check that the correct initial mutations are sent out - InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, targets, EndpointsForRange.of(target1, target2)); + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, targets, EndpointsForRange.of(target1, target2)); handler.sendInitialRepairs(); Assert.assertEquals(1, handler.mutationsSent.size()); Assert.assertTrue(handler.mutationsSent.containsKey(target1.endpoint())); @@ -301,7 +298,7 @@ public class ReadRepairTest Assert.assertEquals(3, repairs.size()); EndpointsForRange replicas = EndpointsForRange.of(target1, target2, target3); - InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicas, replicas); + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, replicas, replicas); handler.sendInitialRepairs(); Assert.assertFalse(getCurrentRepairStatus(handler)); @@ -311,7 +308,6 @@ public class ReadRepairTest // here we should stop blocking, even though we've sent 3 repairs handler.ack(target2.endpoint()); Assert.assertTrue(getCurrentRepairStatus(handler)); - } /** @@ -330,7 +326,7 @@ public class ReadRepairTest EndpointsForRange participants = EndpointsForRange.of(target1, target2, remote1, remote2); EndpointsForRange targets = EndpointsForRange.of(target1, target2); - InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, participants, targets); + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, participants, targets); handler.sendInitialRepairs(); Assert.assertEquals(2, handler.mutationsSent.size()); Assert.assertTrue(handler.mutationsSent.containsKey(target1.endpoint())); diff --git a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java index 53964cb..84276d5 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java @@ -113,7 +113,7 @@ public class TestableReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo } @Override - public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan) + public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan) { for (Map.Entry<Replica, Mutation> entry: mutations.entrySet()) sent.put(entry.getKey().endpoint(), entry.getValue()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org