http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index 75885ae..c296cba 100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@ -21,8 +21,9 @@ import java.util.concurrent.TimeUnit; import com.google.common.base.Preconditions; -import org.apache.cassandra.locator.ReplicaLayout; - +import com.google.common.base.Predicates; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlans; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,6 @@ import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; import static com.google.common.collect.Iterables.all; -import static com.google.common.collect.Iterables.tryFind; /** * Sends a read request to the replicas needed to satisfy a given ConsistencyLevel. @@ -65,24 +65,25 @@ public abstract class AbstractReadExecutor private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class); protected final ReadCommand command; - private final ReplicaLayout.ForToken replicaLayout; - protected final ReadRepair<EndpointsForToken, ReplicaLayout.ForToken> readRepair; - protected final DigestResolver<EndpointsForToken, ReplicaLayout.ForToken> digestResolver; - protected final ReadCallback<EndpointsForToken, ReplicaLayout.ForToken> handler; + private final ReplicaPlan.SharedForTokenRead replicaPlan; + protected final ReadRepair<EndpointsForToken, ReplicaPlan.ForTokenRead> readRepair; + protected final DigestResolver<EndpointsForToken, ReplicaPlan.ForTokenRead> digestResolver; + protected final ReadCallback<EndpointsForToken, ReplicaPlan.ForTokenRead> handler; protected final TraceState traceState; protected final ColumnFamilyStore cfs; protected final long queryStartNanoTime; private final int initialDataRequestCount; protected volatile PartitionIterator result = null; - AbstractReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaLayout.ForToken replicaLayout, int initialDataRequestCount, long queryStartNanoTime) + AbstractReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaPlan.ForTokenRead replicaPlan, int initialDataRequestCount, long queryStartNanoTime) { this.command = command; - this.replicaLayout = replicaLayout; + this.replicaPlan = ReplicaPlan.shared(replicaPlan); this.initialDataRequestCount = initialDataRequestCount; - this.readRepair = ReadRepair.create(command, replicaLayout, queryStartNanoTime); - this.digestResolver = new DigestResolver<>(command, replicaLayout, queryStartNanoTime); - this.handler = new ReadCallback<>(digestResolver, replicaLayout.consistencyLevel().blockFor(replicaLayout.keyspace()), command, replicaLayout, queryStartNanoTime); + // the ReadRepair and DigestResolver both need to see our updated + this.readRepair = ReadRepair.create(command, this.replicaPlan, queryStartNanoTime); + this.digestResolver = new DigestResolver<>(command, this.replicaPlan, queryStartNanoTime); + this.handler = new ReadCallback<>(digestResolver, command, this.replicaPlan, queryStartNanoTime); this.cfs = cfs; this.traceState = Tracing.instance.get(); this.queryStartNanoTime = queryStartNanoTime; @@ -93,7 +94,7 @@ public abstract class AbstractReadExecutor // TODO: we need this when talking with pre-3.0 nodes. So if we preserve the digest format moving forward, we can get rid of this once // we stop being compatible with pre-3.0 nodes. int digestVersion = MessagingService.current_version; - for (Replica replica : replicaLayout.selected()) + for (Replica replica : replicaPlan.contacts()) digestVersion = Math.min(digestVersion, MessagingService.instance().getVersion(replica.endpoint())); command.setDigestVersion(digestVersion); } @@ -168,7 +169,7 @@ public abstract class AbstractReadExecutor */ public void executeAsync() { - EndpointsForToken selected = replicaLayout().selected(); + EndpointsForToken selected = replicaPlan().contacts(); EndpointsForToken fullDataRequests = selected.filter(Replica::isFull, initialDataRequestCount); makeFullDataRequests(fullDataRequests); makeTransientDataRequests(selected.filter(Replica::isTransient)); @@ -184,30 +185,25 @@ public abstract class AbstractReadExecutor ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id); SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry; - // Endpoints for Token - ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry); + ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry); // Speculative retry is disabled *OR* // 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || consistencyLevel == ConsistencyLevel.EACH_QUORUM) - // TODO Looks like we might want to move speculation into the replica layout, but that might be a story for post-4.0 - return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, false); + return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, false); // There are simply no extra replicas to speculate. // Handle this separately so it can record failed attempts to speculate due to lack of replicas - if (replicaLayout.selected().size() == replicaLayout.all().size()) + if (replicaPlan.contacts().size() == replicaPlan.candidates().size()) { boolean recordFailedSpeculation = consistencyLevel != ConsistencyLevel.ALL; - return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, recordFailedSpeculation); + return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, recordFailedSpeculation); } - // If CL.ALL, upgrade to AlwaysSpeculating; - // If We are going to contact every node anyway, ask for 2 full data requests instead of 1, for redundancy - // (same amount of requests in total, but we turn 1 digest request into a full blown data request) if (retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE)) - return new AlwaysSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime); + return new AlwaysSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime); else // PERCENTILE or CUSTOM. - return new SpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime); + return new SpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime); } /** @@ -223,9 +219,9 @@ public abstract class AbstractReadExecutor return !handler.await(cfs.sampleReadLatencyNanos, TimeUnit.NANOSECONDS); } - ReplicaLayout.ForToken replicaLayout() + ReplicaPlan.ForTokenRead replicaPlan() { - return replicaLayout; + return replicaPlan.get(); } void onReadTimeout() {} @@ -239,9 +235,9 @@ public abstract class AbstractReadExecutor */ private final boolean logFailedSpeculation; - public NeverSpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaLayout.ForToken replicaLayout, long queryStartNanoTime, boolean logFailedSpeculation) + public NeverSpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaPlan.ForTokenRead replicaPlan, long queryStartNanoTime, boolean logFailedSpeculation) { - super(cfs, command, replicaLayout, 1, queryStartNanoTime); + super(cfs, command, replicaPlan, 1, queryStartNanoTime); this.logFailedSpeculation = logFailedSpeculation; } @@ -260,13 +256,13 @@ public abstract class AbstractReadExecutor public SpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand command, - ReplicaLayout.ForToken replicaLayout, + ReplicaPlan.ForTokenRead replicaPlan, long queryStartNanoTime) { // We're hitting additional targets for read repair (??). Since our "extra" replica is the least- // preferred by the snitch, we do an extra data read to start with against a replica more // likely to reply; better to let RR fail than the entire query. - super(cfs, command, replicaLayout, replicaLayout.blockFor() < replicaLayout.selected().size() ? 2 : 1, queryStartNanoTime); + super(cfs, command, replicaPlan, replicaPlan.blockFor() < replicaPlan.contacts().size() ? 2 : 1, queryStartNanoTime); } public void maybeTryAdditionalReplicas() @@ -277,12 +273,12 @@ public abstract class AbstractReadExecutor cfs.metric.speculativeRetries.inc(); speculated = true; + ReplicaPlan.ForTokenRead replicaPlan = replicaPlan(); ReadCommand retryCommand = command; Replica extraReplica; if (handler.resolver.isDataPresent()) { - extraReplica = tryFind(replicaLayout().all(), - r -> !replicaLayout().selected().contains(r)).orNull(); + extraReplica = replicaPlan.firstUncontactedCandidate(Predicates.alwaysTrue()); // we should only use a SpeculatingReadExecutor if we have an extra replica to speculate against assert extraReplica != null; @@ -293,8 +289,7 @@ public abstract class AbstractReadExecutor } else { - extraReplica = tryFind(replicaLayout().all(), - r -> r.isFull() && !replicaLayout().selected().contains(r)).orNull(); + extraReplica = replicaPlan.firstUncontactedCandidate(Replica::isFull); if (extraReplica == null) { cfs.metric.speculativeInsufficientReplicas.inc(); @@ -304,6 +299,11 @@ public abstract class AbstractReadExecutor } } + // we must update the plan to include this new node, else when we come to read-repair, we may not include this + // speculated response in the data requests we make again, and we will not be able to 'speculate' an extra repair read, + // nor would we be able to speculate a new 'write' if the repair writes are insufficient + super.replicaPlan.addToContacts(extraReplica); + if (traceState != null) traceState.trace("speculating read retry on {}", extraReplica); logger.trace("speculating read retry on {}", extraReplica); @@ -325,10 +325,12 @@ public abstract class AbstractReadExecutor { public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand command, - ReplicaLayout.ForToken replicaLayout, + ReplicaPlan.ForTokenRead replicaPlan, long queryStartNanoTime) { - super(cfs, command, replicaLayout, replicaLayout.selected().size() > 1 ? 2 : 1, queryStartNanoTime); + // presumably, we speculate an extra data request here in case it is our data request that fails to respond, + // and there are no more nodes to consult + super(cfs, command, replicaPlan, replicaPlan.contacts().size() > 1 ? 2 : 1, queryStartNanoTime); } public void maybeTryAdditionalReplicas() @@ -403,7 +405,7 @@ public abstract class AbstractReadExecutor logger.trace("Timed out waiting on digest mismatch repair requests"); // the caught exception here will have CL.ALL from the repair command, // not whatever CL the initial command was at (CASSANDRA-7947) - throw new ReadTimeoutException(replicaLayout().consistencyLevel(), handler.blockfor - 1, handler.blockfor, true); + throw new ReadTimeoutException(replicaPlan().consistencyLevel(), handler.blockFor - 1, handler.blockFor, true); } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java b/src/java/org/apache/cassandra/service/reads/DataResolver.java index a6901b2..db5f3c8 100644 --- a/src/java/org/apache/cassandra/service/reads/DataResolver.java +++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java @@ -42,7 +42,7 @@ import org.apache.cassandra.db.transform.Filter; import org.apache.cassandra.db.transform.FilteredPartitions; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.locator.Endpoints; -import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.reads.repair.ReadRepair; @@ -52,14 +52,14 @@ import org.apache.cassandra.service.reads.repair.RepairedDataVerifier; import static com.google.common.collect.Iterables.*; import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener; -public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ResponseResolver<E, L> +public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P> { private final boolean enforceStrictLiveness; - private final ReadRepair<E, L> readRepair; + private final ReadRepair<E, P> readRepair; - public DataResolver(ReadCommand command, L replicaLayout, ReadRepair<E, L> readRepair, long queryStartNanoTime) + public DataResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, ReadRepair<E, P> readRepair, long queryStartNanoTime) { - super(command, replicaLayout, queryStartNanoTime); + super(command, replicaPlan, queryStartNanoTime); this.enforceStrictLiveness = command.metadata().enforceStrictLiveness(); this.readRepair = readRepair; } @@ -83,7 +83,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> Collection<MessageIn<ReadResponse>> messages = responses.snapshot(); assert !any(messages, msg -> msg.payload.isDigestResponse()); - E replicas = replicaLayout.all().keep(transform(messages, msg -> msg.from)); + E replicas = replicaPlan().candidates().select(transform(messages, msg -> msg.from), false); List<UnfilteredPartitionIterator> iters = new ArrayList<>( Collections2.transform(messages, msg -> msg.payload.makeIterator(command))); assert replicas.size() == iters.size(); @@ -121,7 +121,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness); UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, - replicaLayout.withSelected(replicas), + replicaPlan.getWithContacts(replicas), mergedResultCounter, repairedDataTracker); FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness())); @@ -135,7 +135,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> } private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, - L sources, + P sources, DataLimits.Counter mergedResultCounter, RepairedDataTracker repairedDataTracker) { @@ -150,7 +150,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> */ if (!command.limits().isUnlimited()) for (int i = 0; i < results.size(); i++) - results.set(i, ShortReadProtection.extend(sources.selected().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness)); + results.set(i, ShortReadProtection.extend(sources.contacts().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness)); return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker)); } @@ -161,7 +161,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> } private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, - L sources, + P sources, RepairedDataTracker repairedDataTracker) { // Avoid wrapping no-op listeners as it doesn't throw @@ -191,7 +191,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> table, mergedDeletion == null ? "null" : mergedDeletion.toString(), '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']', - sources.selected(), + sources.contacts(), makeResponsesDebugString(partitionKey)); throw new AssertionError(details, e); } @@ -212,7 +212,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> table, merged == null ? "null" : merged.toString(table), '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']', - sources.selected(), + sources.contacts(), makeResponsesDebugString(partitionKey)); throw new AssertionError(details, e); } @@ -238,7 +238,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> table, merged == null ? "null" : merged.toString(table), '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']', - sources.selected(), + sources.contacts(), makeResponsesDebugString(partitionKey)); throw new AssertionError(details, e); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/DigestResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java index 28c2117..0dcae95 100644 --- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java +++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java @@ -29,10 +29,10 @@ import org.apache.cassandra.db.ReadResponse; import org.apache.cassandra.db.SinglePartitionReadCommand; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; -import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.service.reads.repair.NoopReadRepair; import org.apache.cassandra.service.reads.repair.ReadRepair; @@ -40,13 +40,13 @@ import org.apache.cassandra.utils.ByteBufferUtil; import static com.google.common.collect.Iterables.any; -public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ResponseResolver<E, L> +public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P> { private volatile MessageIn<ReadResponse> dataResponse; - public DigestResolver(ReadCommand command, L replicas, long queryStartNanoTime) + public DigestResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime) { - super(command, replicas, queryStartNanoTime); + super(command, replicaPlan, queryStartNanoTime); Preconditions.checkArgument(command instanceof SinglePartitionReadCommand, "DigestResolver can only be used with SinglePartitionReadCommand commands"); } @@ -55,7 +55,7 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L public void preprocess(MessageIn<ReadResponse> message) { super.preprocess(message); - Replica replica = replicaLayout.getReplicaFor(message.from); + Replica replica = replicaPlan().getReplicaFor(message.from); if (dataResponse == null && !message.payload.isDigestResponse() && replica.isFull()) { dataResponse = message; @@ -76,7 +76,7 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L { return any(responses, msg -> !msg.payload.isDigestResponse() - && replicaLayout.getReplicaFor(msg.from).isTransient()); + && replicaPlan().getReplicaFor(msg.from).isTransient()); } public PartitionIterator getData() @@ -93,16 +93,14 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L { // This path can be triggered only if we've got responses from full replicas and they match, but // transient replica response still contains data, which needs to be reconciled. - DataResolver<E, L> dataResolver = new DataResolver<>(command, - replicaLayout, - NoopReadRepair.instance, - queryStartNanoTime); + DataResolver<E, P> dataResolver + = new DataResolver<>(command, replicaPlan, NoopReadRepair.instance, queryStartNanoTime); dataResolver.preprocess(dataResponse); // Reconcile with transient replicas for (MessageIn<ReadResponse> response : responses) { - Replica replica = replicaLayout.getReplicaFor(response.from); + Replica replica = replicaPlan().getReplicaFor(response.from); if (replica.isTransient()) dataResolver.preprocess(response); } @@ -119,7 +117,7 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L ByteBuffer digest = null; for (MessageIn<ReadResponse> message : responses.snapshot()) { - if (replicaLayout.getReplicaFor(message.from).isTransient()) + if (replicaPlan().getReplicaFor(message.from).isTransient()) continue; ByteBuffer newDigest = message.payload.digest(command); http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java index 3d39377..7a2385c 100644 --- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java @@ -23,19 +23,18 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.ReplicaPlan; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.PartitionRangeReadCommand; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadResponse; import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.exceptions.RequestFailureReason; -import org.apache.cassandra.exceptions.UnavailableException; import org.apache.cassandra.locator.Endpoints; -import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; @@ -44,16 +43,17 @@ import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.SimpleCondition; -public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements IAsyncCallbackWithFailure<ReadResponse> +public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> implements IAsyncCallbackWithFailure<ReadResponse> { protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class ); public final ResponseResolver resolver; final SimpleCondition condition = new SimpleCondition(); private final long queryStartNanoTime; - // TODO: move to replica layout as well? - final int blockfor; - final L replicaLayout; + final int blockFor; // TODO: move to replica plan as well? + // this uses a plain reference, but is initialised before handoff to any other threads; the later updates + // may not be visible to the threads immediately, but ReplicaPlan only contains final fields, so they will never see an uninitialised object + final ReplicaPlan.Shared<E, P> replicaPlan; private final ReadCommand command; private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received"); @@ -63,19 +63,24 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> private volatile int failures = 0; private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint; - public ReadCallback(ResponseResolver resolver, int blockfor, ReadCommand command, L replicaLayout, long queryStartNanoTime) + public ReadCallback(ResponseResolver resolver, ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime) { this.command = command; - this.blockfor = blockfor; this.resolver = resolver; this.queryStartNanoTime = queryStartNanoTime; - this.replicaLayout = replicaLayout; + this.replicaPlan = replicaPlan; + this.blockFor = replicaPlan.get().blockFor(); this.failureReasonByEndpoint = new ConcurrentHashMap<>(); // we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897) - assert !(command instanceof PartitionRangeReadCommand) || blockfor >= replicaLayout.selected().size(); + assert !(command instanceof PartitionRangeReadCommand) || blockFor >= replicaPlan().contacts().size(); if (logger.isTraceEnabled()) - logger.trace("Blockfor is {}; setting up requests to {}", blockfor, this.replicaLayout); + logger.trace("Blockfor is {}; setting up requests to {}", blockFor, this.replicaPlan); + } + + protected P replicaPlan() + { + return replicaPlan.get(); } public boolean await(long timePastStart, TimeUnit unit) @@ -94,30 +99,30 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> public void awaitResults() throws ReadFailureException, ReadTimeoutException { boolean signaled = await(command.getTimeout(), TimeUnit.MILLISECONDS); - boolean failed = failures > 0 && blockfor + failures > replicaLayout.selected().size(); + boolean failed = failures > 0 && blockFor + failures > replicaPlan().contacts().size(); if (signaled && !failed) return; if (Tracing.isTracing()) { String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : ""; - Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData }); + Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockFor, gotData }); } else if (logger.isDebugEnabled()) { String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : ""; - logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData }); + logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockFor, gotData }); } // Same as for writes, see AbstractWriteResponseHandler throw failed - ? new ReadFailureException(replicaLayout.consistencyLevel(), received, blockfor, resolver.isDataPresent(), failureReasonByEndpoint) - : new ReadTimeoutException(replicaLayout.consistencyLevel(), received, blockfor, resolver.isDataPresent()); + ? new ReadFailureException(replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint) + : new ReadTimeoutException(replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent()); } public int blockFor() { - return blockfor; + return blockFor; } public void response(MessageIn<ReadResponse> message) @@ -127,24 +132,16 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ? recievedUpdater.incrementAndGet(this) : received; - if (n >= blockfor && resolver.isDataPresent()) + if (n >= blockFor && resolver.isDataPresent()) condition.signalAll(); } /** - * @return true if the message counts towards the blockfor threshold + * @return true if the message counts towards the blockFor threshold */ private boolean waitingFor(InetAddressAndPort from) { - return !replicaLayout.consistencyLevel().isDatacenterLocal() || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from)); - } - - /** - * @return the current number of received responses - */ - public int getReceivedCount() - { - return received; + return !replicaPlan().consistencyLevel().isDatacenterLocal() || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from)); } public void response(ReadResponse result) @@ -157,11 +154,6 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> response(message); } - public void assureSufficientLiveNodes() throws UnavailableException - { - replicaLayout.consistencyLevel().assureSufficientLiveNodesForRead(replicaLayout.keyspace(), replicaLayout.selected()); - } - public boolean isLatencyForSnitch() { return true; @@ -176,7 +168,7 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> failureReasonByEndpoint.put(from, failureReason); - if (blockfor + n > replicaLayout.selected().size()) + if (blockFor + n > replicaPlan().contacts().size()) condition.signalAll(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java index 298f843..0c1e1ba 100644 --- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java +++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java @@ -23,30 +23,34 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadResponse; import org.apache.cassandra.locator.Endpoints; -import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.utils.concurrent.Accumulator; -public abstract class ResponseResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> +public abstract class ResponseResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> { protected static final Logger logger = LoggerFactory.getLogger(ResponseResolver.class); protected final ReadCommand command; - protected final L replicaLayout; + protected final ReplicaPlan.Shared<E, P> replicaPlan; // Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints protected final Accumulator<MessageIn<ReadResponse>> responses; protected final long queryStartNanoTime; - public ResponseResolver(ReadCommand command, L replicaLayout, long queryStartNanoTime) + public ResponseResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime) { this.command = command; - this.replicaLayout = replicaLayout; - // TODO: calculate max possible replicas for the query (e.g. local dc queries won't contact remotes) - this.responses = new Accumulator<>(replicaLayout.all().size()); + this.replicaPlan = replicaPlan; + this.responses = new Accumulator<>(replicaPlan.get().candidates().size()); this.queryStartNanoTime = queryStartNanoTime; } + protected P replicaPlan() + { + return replicaPlan.get(); + } + public abstract boolean isDataPresent(); public void preprocess(MessageIn<ReadResponse> message) @@ -57,8 +61,8 @@ public abstract class ResponseResolver<E extends Endpoints<E>, L extends Replica } catch (IllegalStateException e) { - logger.error("Encountered error while trying to preprocess the message {}, in command {}, replica layout: {}", - message, command, replicaLayout); + logger.error("Encountered error while trying to preprocess the message {}, in command {}, replica plan: {}", + message, command, replicaPlan); throw e; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java index 580b790..b16d105 100644 --- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java +++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java @@ -19,6 +19,8 @@ package org.apache.cassandra.service.reads; import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlans; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +42,6 @@ import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.ExcludingBounds; import org.apache.cassandra.dht.Range; -import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.reads.repair.NoopReadRepair; @@ -87,10 +88,11 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI * If we don't apply the transformation *after* extending the partition with MoreRows, * applyToRow() method of protection will not be called on the first row of the new extension iterator. */ - ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forSingleReplica(Keyspace.open(command.metadata().keyspace), partition.partitionKey().getToken(), source); + ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), partition.partitionKey().getToken(), source); + ReplicaPlan.SharedForTokenRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan); ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.partitionKey(), command, source, - (cmd) -> executeReadCommand(cmd, replicaLayout), + (cmd) -> executeReadCommand(cmd, sharedReplicaPlan), singleResultCounter, mergedResultCounter); return Transformation.apply(MoreRows.extend(partition, protection), protection); @@ -169,14 +171,15 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI : new ExcludingBounds<>(lastPartitionKey, bounds.right); DataRange newDataRange = cmd.dataRange().forSubRange(newBounds); - ReplicaLayout.ForRange replicaLayout = ReplicaLayout.forSingleReplica(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source); - return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), replicaLayout); + ReplicaPlan.ForRangeRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source); + return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), ReplicaPlan.shared(replicaPlan)); } - private <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, L replicaLayout) + private <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, ReplicaPlan.Shared<E, P> replicaPlan) { - DataResolver<E, L> resolver = new DataResolver<>(cmd, replicaLayout, (NoopReadRepair<E, L>)NoopReadRepair.instance, queryStartNanoTime); - ReadCallback<E, L> handler = new ReadCallback<>(resolver, replicaLayout.consistencyLevel().blockFor(replicaLayout.keyspace()), cmd, replicaLayout, queryStartNanoTime); + DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, (NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime); + ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime); if (source.isLocal()) StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java index 528d31b..1b213ff 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java @@ -24,6 +24,7 @@ import java.util.function.Consumer; import com.google.common.base.Preconditions; import com.codahale.metrics.Meter; +import com.google.common.base.Predicates; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ConsistencyLevel; @@ -34,7 +35,7 @@ import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.Replica; -import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.metrics.ReadRepairMetrics; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -44,11 +45,12 @@ import org.apache.cassandra.service.reads.DigestResolver; import org.apache.cassandra.service.reads.ReadCallback; import org.apache.cassandra.tracing.Tracing; -public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements ReadRepair<E, L> +public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + implements ReadRepair<E, P> { protected final ReadCommand command; protected final long queryStartNanoTime; - protected final L replicaLayout; + protected final ReplicaPlan.Shared<E, P> replicaPlan; protected final ColumnFamilyStore cfs; private volatile DigestRepair digestRepair = null; @@ -68,15 +70,20 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli } public AbstractReadRepair(ReadCommand command, - L replicaLayout, + ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime) { this.command = command; this.queryStartNanoTime = queryStartNanoTime; - this.replicaLayout = replicaLayout; + this.replicaPlan = replicaPlan; this.cfs = Keyspace.openAndGetStore(command.metadata()); } + protected P replicaPlan() + { + return replicaPlan.get(); + } + void sendReadCommand(Replica to, ReadCallback readCallback) { MessageOut<ReadCommand> message = command.createMessage(); @@ -90,14 +97,13 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli abstract Meter getRepairMeter(); // digestResolver isn't used here because we resend read requests to all participants - public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer) + public void startRepair(DigestResolver<E, P> digestResolver, Consumer<PartitionIterator> resultConsumer) { getRepairMeter().mark(); // Do a full data read to resolve the correct response (and repair node that need be) - DataResolver<E, L> resolver = new DataResolver<>(command, replicaLayout, this, queryStartNanoTime); - ReadCallback<E, L> readCallback = new ReadCallback<>(resolver, replicaLayout.consistencyLevel().blockFor(cfs.keyspace), - command, replicaLayout, queryStartNanoTime); + DataResolver<E, P> resolver = new DataResolver<>(command, replicaPlan, this, queryStartNanoTime); + ReadCallback<E, P> readCallback = new ReadCallback<>(resolver, command, replicaPlan, queryStartNanoTime); digestRepair = new DigestRepair(resolver, readCallback, resultConsumer); @@ -105,12 +111,12 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled()) command.trackRepairedStatus(); - for (Replica replica : replicaLayout.selected()) + for (Replica replica : replicaPlan().contacts()) { Tracing.trace("Enqueuing full data read to {}", replica); sendReadCommand(replica, readCallback); } - ReadRepairDiagnostics.startRepair(this, replicaLayout, digestResolver); + ReadRepairDiagnostics.startRepair(this, replicaPlan(), digestResolver); } public void awaitReads() throws ReadTimeoutException @@ -125,7 +131,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli private boolean shouldSpeculate() { - ConsistencyLevel consistency = replicaLayout.consistencyLevel(); + ConsistencyLevel consistency = replicaPlan().consistencyLevel(); ConsistencyLevel speculativeCL = consistency.isDatacenterLocal() ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM; return consistency != ConsistencyLevel.EACH_QUORUM && consistency.satisfies(speculativeCL, cfs.keyspace) @@ -142,15 +148,15 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli if (shouldSpeculate() && !repair.readCallback.await(cfs.sampleReadLatencyNanos, TimeUnit.NANOSECONDS)) { - L uncontacted = replicaLayout.forNaturalUncontacted(); - if (uncontacted.selected().isEmpty()) + Replica uncontacted = replicaPlan().firstUncontactedCandidate(Predicates.alwaysTrue()); + if (uncontacted == null) return; - Replica replica = uncontacted.selected().iterator().next(); - Tracing.trace("Enqueuing speculative full data read to {}", replica); - sendReadCommand(replica, repair.readCallback); + replicaPlan.addToContacts(uncontacted); + Tracing.trace("Enqueuing speculative full data read to {}", uncontacted); + sendReadCommand(uncontacted, repair.readCallback); ReadRepairMetrics.speculatedRead.mark(); - ReadRepairDiagnostics.speculatedRead(this, replica.endpoint(), uncontacted); + ReadRepairDiagnostics.speculatedRead(this, uncontacted.endpoint(), replicaPlan()); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java index 54af2cf..f536ea8 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java @@ -37,9 +37,9 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.locator.Endpoints; -import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.locator.Replicas; import org.apache.cassandra.metrics.ReadRepairMetrics; import org.apache.cassandra.net.IAsyncCallback; @@ -49,25 +49,26 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.tracing.Tracing; -public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractFuture<Object> implements IAsyncCallback<Object> +public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + extends AbstractFuture<Object> implements IAsyncCallback<Object> { private final DecoratedKey key; - private final L replicaLayout; + private final P replicaPlan; private final Map<Replica, Mutation> pendingRepairs; private final CountDownLatch latch; private volatile long mutationsSentTime; - public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout) + public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan) { this.key = key; this.pendingRepairs = new ConcurrentHashMap<>(repairs); - this.replicaLayout = replicaLayout; + this.replicaPlan = replicaPlan; // here we remove empty repair mutations from the block for total, since // we're not sending them mutations int blockFor = maxBlockFor; - for (Replica participant: replicaLayout.selected()) + for (Replica participant: replicaPlan.contacts()) { // remote dcs can sometimes get involved in dc-local reads. We want to repair // them if they do, but they shouldn't interfere with blocking the client read. @@ -97,7 +98,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa private boolean shouldBlockOn(InetAddressAndPort endpoint) { - return !replicaLayout.consistencyLevel().isDatacenterLocal() || isLocal(endpoint); + return !replicaPlan.consistencyLevel().isDatacenterLocal() || isLocal(endpoint); } @VisibleForTesting @@ -105,7 +106,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa { if (shouldBlockOn(from)) { - pendingRepairs.remove(replicaLayout.getReplicaFor(from)); + pendingRepairs.remove(replicaPlan.getReplicaFor(from)); latch.countDown(); } } @@ -198,8 +199,8 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa if (awaitRepairs(timeout, timeoutUnit)) return; - L newCandidates = replicaLayout.forNaturalUncontacted(); - if (newCandidates.selected().isEmpty()) + E newCandidates = replicaPlan.uncontactedCandidates(); + if (newCandidates.isEmpty()) return; PartitionUpdate update = mergeUnackedUpdates(); @@ -212,7 +213,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa Mutation[] versionedMutations = new Mutation[msgVersionIdx(MessagingService.current_version) + 1]; - for (Replica replica : newCandidates.selected()) + for (Replica replica : newCandidates) { int versionIdx = msgVersionIdx(MessagingService.instance().getVersion(replica.endpoint())); @@ -220,7 +221,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa if (mutation == null) { - mutation = BlockingReadRepairs.createRepairMutation(update, replicaLayout.consistencyLevel(), replica.endpoint(), true); + mutation = BlockingReadRepairs.createRepairMutation(update, replicaPlan.consistencyLevel(), replica.endpoint(), true); versionedMutations[versionIdx] = mutation; } @@ -239,7 +240,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa Keyspace getKeyspace() { - return replicaLayout.keyspace(); + return replicaPlan.keyspace(); } DecoratedKey getKey() @@ -249,6 +250,6 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa ConsistencyLevel getConsistency() { - return replicaLayout.consistencyLevel(); + return replicaPlan.consistencyLevel(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java index 402aed0..938abaf 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java @@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.locator.Endpoints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,8 +33,9 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.Replica; -import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.metrics.ReadRepairMetrics; import org.apache.cassandra.tracing.Tracing; @@ -44,22 +44,23 @@ import org.apache.cassandra.tracing.Tracing; * updates have been written to nodes needing correction. Breaks write * atomicity in some situations */ -public class BlockingReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractReadRepair<E, L> +public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + extends AbstractReadRepair<E, P> { private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepair.class); protected final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>(); private final int blockFor; - BlockingReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime) + BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime) { - super(command, replicaLayout, queryStartNanoTime); - this.blockFor = replicaLayout.consistencyLevel().blockFor(cfs.keyspace); + super(command, replicaPlan, queryStartNanoTime); + this.blockFor = replicaPlan().consistencyLevel().blockFor(cfs.keyspace); } - public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout) + public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan) { - return new PartitionIteratorMergeListener(replicaLayout, command, this.replicaLayout.consistencyLevel(), this); + return new PartitionIteratorMergeListener<>(replicaPlan, command, this); } @Override @@ -91,20 +92,20 @@ public class BlockingReadRepair<E extends Endpoints<E>, L extends ReplicaLayout< if (timedOut) { // We got all responses, but timed out while repairing - int blockFor = replicaLayout.consistencyLevel().blockFor(cfs.keyspace); + int blockFor = replicaPlan().blockFor(); if (Tracing.isTracing()) Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); else logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor); - throw new ReadTimeoutException(replicaLayout.consistencyLevel(), blockFor - 1, blockFor, true); + throw new ReadTimeoutException(replicaPlan().consistencyLevel(), blockFor - 1, blockFor, true); } } @Override - public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout) + public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan) { - BlockingPartitionRepair<E, L> blockingRepair = new BlockingPartitionRepair<>(partitionKey, mutations, blockFor, replicaLayout); + BlockingPartitionRepair<E, P> blockingRepair = new BlockingPartitionRepair<>(partitionKey, mutations, blockFor, replicaPlan); blockingRepair.sendInitialRepairs(); repairs.add(blockingRepair); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java index 4af4a92..6aa6ece 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java @@ -26,28 +26,29 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.exceptions.ReadTimeoutException; -import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.service.reads.DigestResolver; /** * Bypasses the read repair path for short read protection and testing */ -public class NoopReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements ReadRepair<E, L> +public class NoopReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + implements ReadRepair<E, P> { public static final NoopReadRepair instance = new NoopReadRepair(); private NoopReadRepair() {} @Override - public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicas) + public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicas) { return UnfilteredPartitionIterators.MergeListener.NOOP; } @Override - public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer) + public void startRepair(DigestResolver<E, P> digestResolver, Consumer<PartitionIterator> resultConsumer) { resultConsumer.accept(digestResolver.getData()); } @@ -75,7 +76,7 @@ public class NoopReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L } @Override - public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout) + public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan) { } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java index 4cae3ae..7247704 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java +++ b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java @@ -28,26 +28,26 @@ import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.rows.UnfilteredRowIterators; -import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.ReplicaPlan; -public class PartitionIteratorMergeListener implements UnfilteredPartitionIterators.MergeListener +public class PartitionIteratorMergeListener<E extends Endpoints<E>> + implements UnfilteredPartitionIterators.MergeListener { - private final ReplicaLayout replicaLayout; + private final ReplicaPlan.ForRead<E> replicaPlan; private final ReadCommand command; - private final ConsistencyLevel consistency; private final ReadRepair readRepair; - public PartitionIteratorMergeListener(ReplicaLayout replicaLayout, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair) + public PartitionIteratorMergeListener(ReplicaPlan.ForRead<E> replicaPlan, ReadCommand command, ReadRepair readRepair) { - this.replicaLayout = replicaLayout; + this.replicaPlan = replicaPlan; this.command = command; - this.consistency = consistency; this.readRepair = readRepair; } public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) { - return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), replicaLayout, command, consistency, readRepair); + return new RowIteratorMergeListener<>(partitionKey, columns(versions), isReversed(versions), replicaPlan, command, readRepair); } protected RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions) http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java index c13e2d6..64bfec2 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java @@ -27,22 +27,23 @@ import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.Replica; -import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.metrics.ReadRepairMetrics; /** * Only performs the collection of data responses and reconciliation of them, doesn't send repair mutations * to replicas. This preserves write atomicity, but doesn't provide monotonic quorum reads */ -public class ReadOnlyReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractReadRepair<E, L> +public class ReadOnlyReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + extends AbstractReadRepair<E, P> { - ReadOnlyReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime) + ReadOnlyReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime) { - super(command, replicaLayout, queryStartNanoTime); + super(command, replicaPlan, queryStartNanoTime); } @Override - public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout) + public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan) { return UnfilteredPartitionIterators.MergeListener.NOOP; } @@ -60,7 +61,7 @@ public class ReadOnlyReadRepair<E extends Endpoints<E>, L extends ReplicaLayout< } @Override - public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout) + public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan) { throw new UnsupportedOperationException("ReadOnlyReadRepair shouldn't be trying to repair partitions"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java index 168f003..9441945 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java @@ -19,6 +19,7 @@ package org.apache.cassandra.service.reads.repair; import java.util.Map; import java.util.function.Consumer; +import java.util.function.Supplier; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.locator.Endpoints; @@ -29,17 +30,19 @@ import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.locator.Replica; -import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.service.reads.DigestResolver; -public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> +public interface ReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> { public interface Factory { - <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime); + <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime); } - static <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaPlan, long queryStartNanoTime) + static <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime) { return command.metadata().params.readRepair.create(command, replicaPlan, queryStartNanoTime); } @@ -47,7 +50,7 @@ public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L /** * Used by DataResolver to generate corrections as the partition iterator is consumed */ - UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout); + UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan); /** * Called when the digests from the initial read don't match. Reads may block on the @@ -55,7 +58,7 @@ public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L * @param digestResolver supplied so we can get the original data response * @param resultConsumer hook for the repair to set it's result on completion */ - public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer); + public void startRepair(DigestResolver<E, P> digestResolver, Consumer<PartitionIterator> resultConsumer); /** * Block on the reads (or timeout) sent out in {@link ReadRepair#startRepair} @@ -90,5 +93,5 @@ public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L * Repairs a partition _after_ receiving data responses. This method receives replica list, since * we will block repair only on the replicas that have responded. */ - void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout); + void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java index 4c74a89..b9167bd 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java @@ -26,6 +26,7 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.diag.DiagnosticEventService; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.service.reads.DigestResolver; import org.apache.cassandra.service.reads.repair.PartitionRepairEvent.PartitionRepairEventType; import org.apache.cassandra.service.reads.repair.ReadRepairEvent.ReadRepairEventType; @@ -38,22 +39,22 @@ final class ReadRepairDiagnostics { } - static void startRepair(AbstractReadRepair readRepair, ReplicaLayout<?, ?> layout, DigestResolver digestResolver) + static void startRepair(AbstractReadRepair readRepair, ReplicaPlan.ForRead<?> fullPlan, DigestResolver digestResolver) { if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.START_REPAIR)) service.publish(new ReadRepairEvent(ReadRepairEventType.START_REPAIR, readRepair, - layout.selected().endpoints(), - layout.all().endpoints(), digestResolver)); + fullPlan.contacts().endpoints(), + fullPlan.candidates().endpoints(), digestResolver)); } static void speculatedRead(AbstractReadRepair readRepair, InetAddressAndPort endpoint, - ReplicaLayout<?, ?> replicaLayout) + ReplicaPlan.ForRead<?> fullPlan) { if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.SPECULATED_READ)) service.publish(new ReadRepairEvent(ReadRepairEventType.SPECULATED_READ, readRepair, Collections.singletonList(endpoint), - Lists.newArrayList(replicaLayout.all().endpoints()), null)); + Lists.newArrayList(fullPlan.candidates().endpoints()), null)); } static void sendInitialRepair(BlockingPartitionRepair partitionRepair, InetAddressAndPort destination, Mutation mutation) http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java index 9e14362..5cec802 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java @@ -67,7 +67,7 @@ final class ReadRepairEvent extends DiagnosticEvent this.keyspace = readRepair.cfs.keyspace; this.tableName = readRepair.cfs.getTableName(); this.cqlCommand = readRepair.command.toCQLString(); - this.consistency = readRepair.replicaLayout.consistencyLevel(); + this.consistency = readRepair.replicaPlan().consistencyLevel(); this.speculativeRetry = readRepair.cfs.metadata().params.speculativeRetry.kind(); this.destinations = destinations; this.allEndpoints = allEndpoints; http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java index 28c0e9e..7a4b795 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java @@ -21,22 +21,25 @@ package org.apache.cassandra.service.reads.repair; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaPlan; public enum ReadRepairStrategy implements ReadRepair.Factory { NONE { - public <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime) + public <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime) { - return new ReadOnlyReadRepair<>(command, replicaLayout, queryStartNanoTime); + return new ReadOnlyReadRepair<>(command, replicaPlan, queryStartNanoTime); } }, BLOCKING { - public <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime) + public <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime) { - return new BlockingReadRepair<>(command, replicaLayout, queryStartNanoTime); + return new BlockingReadRepair<>(command, replicaPlan, queryStartNanoTime); } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java index 7fe797a..60e0d41 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java +++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java @@ -46,21 +46,21 @@ import org.apache.cassandra.db.rows.Rows; import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.Replica; -import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.schema.ColumnMetadata; -public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeListener +public class RowIteratorMergeListener<E extends Endpoints<E>> + implements UnfilteredRowIterators.MergeListener { private final DecoratedKey partitionKey; private final RegularAndStaticColumns columns; private final boolean isReversed; private final ReadCommand command; - private final ConsistencyLevel consistency; private final PartitionUpdate.Builder[] repairs; private final Row.Builder[] currentRows; private final RowDiffListener diffListener; - private final ReplicaLayout layout; + private final ReplicaPlan.ForRead<E> replicaPlan; // The partition level deletion for the merge row. private DeletionTime partitionLevelDeletion; @@ -73,19 +73,18 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis private final ReadRepair readRepair; - public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaLayout layout, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair) + public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaPlan.ForRead<E> replicaPlan, ReadCommand command, ReadRepair readRepair) { this.partitionKey = partitionKey; this.columns = columns; this.isReversed = isReversed; - this.layout = layout; - int size = layout.selected().size(); + this.replicaPlan = replicaPlan; + int size = replicaPlan.contacts().size(); repairs = new PartitionUpdate.Builder[size]; currentRows = new Row.Builder[size]; sourceDeletionTime = new DeletionTime[size]; markerToRepair = new ClusteringBound[size]; this.command = command; - this.consistency = consistency; this.readRepair = readRepair; this.diffListener = new RowDiffListener() @@ -310,7 +309,7 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis public void close() { Map<Replica, Mutation> mutations = null; - Endpoints<?> sources = layout.selected(); + Endpoints<?> sources = replicaPlan.contacts(); for (int i = 0; i < repairs.length; i++) { if (repairs[i] == null) @@ -318,7 +317,7 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis Replica source = sources.get(i); - Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, source.endpoint(), false); + Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), replicaPlan.consistencyLevel(), source.endpoint(), false); if (mutation == null) continue; @@ -330,7 +329,7 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis if (mutations != null) { - readRepair.repairPartition(partitionKey, mutations, layout); + readRepair.repairPartition(partitionKey, mutations, replicaPlan); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java index 66eff23..f937f96 100644 --- a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java +++ b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java @@ -22,7 +22,6 @@ import com.google.common.base.Predicates; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import org.apache.cassandra.dht.Murmur3Partitioner; @@ -34,13 +33,14 @@ import org.junit.Assert; import org.junit.Test; import java.net.UnknownHostException; -import java.util.Collections; import java.util.Comparator; import java.util.LinkedHashSet; import java.util.List; import java.util.function.Predicate; import java.util.stream.Collectors; +import static com.google.common.collect.Iterables.*; +import static com.google.common.collect.Iterables.filter; import static org.apache.cassandra.locator.Replica.fullReplica; import static org.apache.cassandra.locator.Replica.transientReplica; @@ -111,7 +111,7 @@ public class ReplicaCollectionTest void testEquals() { - Assert.assertTrue(Iterables.elementsEqual(canonicalList, test)); + Assert.assertTrue(elementsEqual(canonicalList, test)); } void testEndpoints() @@ -144,28 +144,6 @@ public class ReplicaCollectionTest Assert.assertEquals(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), test.endpoints()); } - void testSelect(int subListDepth, int filterDepth, int sortDepth, int selectDepth) - { - TestCase<C> allMatchZeroCapacity = new TestCase<>(test.select().add(Predicates.alwaysTrue(), 0).get(), Collections.emptyList()); - allMatchZeroCapacity.testAll(subListDepth, filterDepth, sortDepth, selectDepth - 1); - - TestCase<C> noMatchFullCapacity = new TestCase<>(test.select().add(Predicates.alwaysFalse(), canonicalList.size()).get(), Collections.emptyList()); - noMatchFullCapacity.testAll(subListDepth, filterDepth, sortDepth,selectDepth - 1); - - if (canonicalList.size() <= 2) - return; - - List<Replica> newOrderList = ImmutableList.of(canonicalList.get(2), canonicalList.get(1), canonicalList.get(0)); - TestCase<C> newOrder = new TestCase<>( - test.select() - .add(r -> r == newOrderList.get(0), 3) - .add(r -> r == newOrderList.get(1), 3) - .add(r -> r == newOrderList.get(2), 3) - .get(), newOrderList - ); - newOrder.testAll(subListDepth, filterDepth, sortDepth,selectDepth - 1); - } - private void assertSubList(C subCollection, int from, int to) { Assert.assertTrue(subCollection.isSnapshot); @@ -182,7 +160,7 @@ public class ReplicaCollectionTest } } - void testSubList(int subListDepth, int filterDepth, int sortDepth, int selectDepth) + void testSubList(int subListDepth, int filterDepth, int sortDepth) { if (test.isSnapshot) Assert.assertSame(test, test.subList(0, test.size())); @@ -192,34 +170,62 @@ public class ReplicaCollectionTest TestCase<C> skipFront = new TestCase<>(test.subList(1, test.size()), canonicalList.subList(1, canonicalList.size())); assertSubList(skipFront.test, 1, canonicalList.size()); - skipFront.testAll(subListDepth - 1, filterDepth, sortDepth, selectDepth); + skipFront.testAll(subListDepth - 1, filterDepth, sortDepth); TestCase<C> skipBack = new TestCase<>(test.subList(0, test.size() - 1), canonicalList.subList(0, canonicalList.size() - 1)); assertSubList(skipBack.test, 0, canonicalList.size() - 1); - skipBack.testAll(subListDepth - 1, filterDepth, sortDepth, selectDepth); + skipBack.testAll(subListDepth - 1, filterDepth, sortDepth); } - void testFilter(int subListDepth, int filterDepth, int sortDepth, int selectDepth) + void testFilter(int subListDepth, int filterDepth, int sortDepth) { if (test.isSnapshot) Assert.assertSame(test, test.filter(Predicates.alwaysTrue())); if (test.isEmpty()) return; + // remove start // we recurse on the same subset in testSubList, so just corroborate we have the correct list here - assertSubList(test.filter(r -> r != canonicalList.get(0)), 1, canonicalList.size()); + { + Predicate<Replica> removeFirst = r -> r != canonicalList.get(0); + assertSubList(test.filter(removeFirst), 1, canonicalList.size()); + assertSubList(test.filter(removeFirst, 1), 1, Math.min(canonicalList.size(), 2)); + } if (test.size() <= 1) return; + // remove end // we recurse on the same subset in testSubList, so just corroborate we have the correct list here - assertSubList(test.filter(r -> r != canonicalList.get(canonicalList.size() - 1)), 0, canonicalList.size() - 1); + { + int last = canonicalList.size() - 1; + Predicate<Replica> removeLast = r -> r != canonicalList.get(last); + assertSubList(test.filter(removeLast), 0, last); + } if (test.size() <= 2) return; + Predicate<Replica> removeMiddle = r -> r != canonicalList.get(canonicalList.size() / 2); - TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), ImmutableList.copyOf(Iterables.filter(canonicalList, removeMiddle::test))); - filtered.testAll(subListDepth, filterDepth - 1, sortDepth, selectDepth); + TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), ImmutableList.copyOf(filter(canonicalList, removeMiddle::test))); + filtered.testAll(subListDepth, filterDepth - 1, sortDepth); + } + + void testCount() + { + Assert.assertEquals(0, test.count(Predicates.alwaysFalse())); + + if (test.isEmpty()) + { + Assert.assertEquals(0, test.count(Predicates.alwaysTrue())); + return; + } + + for (int i = 0 ; i < canonicalList.size() ; ++i) + { + Replica discount = canonicalList.get(i); + Assert.assertEquals(canonicalList.size() - 1, test.count(r -> r != discount)); + } } void testContains() @@ -235,7 +241,7 @@ public class ReplicaCollectionTest Assert.assertEquals(canonicalList.get(i), test.get(i)); } - void testSort(int subListDepth, int filterDepth, int sortDepth, int selectDepth) + void testSort(int subListDepth, int filterDepth, int sortDepth) { final Comparator<Replica> comparator = (o1, o2) -> { @@ -244,10 +250,10 @@ public class ReplicaCollectionTest return f1 == f2 ? 0 : f1 ? 1 : -1; }; TestCase<C> sorted = new TestCase<>(test.sorted(comparator), ImmutableList.sortedCopyOf(comparator, canonicalList)); - sorted.testAll(subListDepth, filterDepth, sortDepth - 1, selectDepth); + sorted.testAll(subListDepth, filterDepth, sortDepth - 1); } - private void testAll(int subListDepth, int filterDepth, int sortDepth, int selectDepth) + private void testAll(int subListDepth, int filterDepth, int sortDepth) { testEndpoints(); testOrderOfIteration(); @@ -255,19 +261,18 @@ public class ReplicaCollectionTest testGet(); testEquals(); testSize(); + testCount(); if (subListDepth > 0) - testSubList(subListDepth, filterDepth, sortDepth, selectDepth); + testSubList(subListDepth, filterDepth, sortDepth); if (filterDepth > 0) - testFilter(subListDepth, filterDepth, sortDepth, selectDepth); + testFilter(subListDepth, filterDepth, sortDepth); if (sortDepth > 0) - testSort(subListDepth, filterDepth, sortDepth, selectDepth); - if (selectDepth > 0) - testSelect(subListDepth, filterDepth, sortDepth, selectDepth); + testSort(subListDepth, filterDepth, sortDepth); } public void testAll() { - testAll(2, 2, 2, 2); + testAll(2, 2, 2); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org