http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaPlans.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java new file mode 100644 index 0000000..25f42c3 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.UnavailableException; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy; +import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; +import org.apache.cassandra.utils.FBUtilities; + +import java.util.Collection; +import java.util.function.Predicate; + +import static com.google.common.collect.Iterables.any; +import static com.google.common.collect.Iterables.filter; +import static com.google.common.collect.Iterables.limit; + +public class ReplicaPlans +{ + + /** + * Construct a ReplicaPlan for writing to exactly one node, with CL.ONE. This node is *assumed* to be alive. + */ + public static ReplicaPlan.ForTokenWrite forSingleReplicaWrite(Keyspace keyspace, Token token, Replica replica) + { + EndpointsForToken one = EndpointsForToken.of(token, replica); + EndpointsForToken empty = EndpointsForToken.empty(token); + return new ReplicaPlan.ForTokenWrite(keyspace, ConsistencyLevel.ONE, empty, one, one, one); + } + + /** + * A forwarding counter write is always sent to a single owning coordinator for the range, by the original coordinator + * (if it is not itself an owner) + */ + public static ReplicaPlan.ForTokenWrite forForwardingCounterWrite(Keyspace keyspace, Token token, Replica replica) + { + return forSingleReplicaWrite(keyspace, token, replica); + } + + /** + * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. + * Note that the liveAndDown collection and live are equal to the provided endpoints. + * + * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO. + * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear. + */ + public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection<InetAddressAndPort> endpoints) throws UnavailableException + { + // A single case we write not for range or token, but multiple mutations to many tokens + Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); + + ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( + SystemReplicas.getSystemReplicas(endpoints).forToken(token), + EndpointsForToken.empty(token) + ); + ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO; + + // assume that we have already been given live endpoints, and skip applying the failure detector + return forWrite(keyspace, consistencyLevel, liveAndDown, liveAndDown, writeAll); + } + + public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException + { + return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token), selector); + } + + @VisibleForTesting + public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken pending, Predicate<Replica> isAlive, Selector selector) throws UnavailableException + { + return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWrite(natural, pending), isAlive, selector); + } + + public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, Selector selector) throws UnavailableException + { + return forWrite(keyspace, consistencyLevel, liveAndDown, FailureDetector.isReplicaAlive, selector); + } + + @VisibleForTesting + public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, Predicate<Replica> isAlive, Selector selector) throws UnavailableException + { + ReplicaLayout.ForTokenWrite live = liveAndDown.filter(isAlive); + return forWrite(keyspace, consistencyLevel, liveAndDown, live, selector); + } + + public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException + { + EndpointsForToken contacts = selector.select(keyspace, consistencyLevel, liveAndDown, live); + ReplicaPlan.ForTokenWrite result = new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts); + result.assureSufficientReplicas(); + return result; + } + + public interface Selector + { + <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>> + E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live); + } + + /** + * Select all nodes, transient or otherwise, as targets for the operation. + * + * This is may no longer be useful until we finish implementing transient replication support, however + * it can be of value to stipulate that a location writes to all nodes without regard to transient status. + */ + public static final Selector writeAll = new Selector() + { + @Override + public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>> + E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live) + { + return liveAndDown.all(); + } + }; + + /** + * Select all full nodes, live or down, as write targets. If there are insufficient nodes to complete the write, + * but there are live transient nodes, select a sufficient number of these to reach our consistency level. + * + * Pending nodes are always contacted, whether or not they are full. When a transient replica is undergoing + * a pending move to a new node, if we write (transiently) to it, this write would not be replicated to the + * pending transient node, and so when completing the move, the write could effectively have not reached the + * promised consistency level. + */ + public static final Selector writeNormal = new Selector() + { + @Override + public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>> + E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live) + { + if (!any(liveAndDown.all(), Replica::isTransient)) + return liveAndDown.all(); + + assert consistencyLevel != ConsistencyLevel.EACH_QUORUM; + + ReplicaCollection.Mutable<E> contacts = liveAndDown.all().newMutable(liveAndDown.all().size()); + contacts.addAll(filter(liveAndDown.natural(), Replica::isFull)); + contacts.addAll(liveAndDown.pending()); + + // TODO: this doesn't correctly handle LOCAL_QUORUM (or EACH_QUORUM at all) + int liveCount = contacts.count(live.all()::contains); + int requiredTransientCount = consistencyLevel.blockForWrite(keyspace, liveAndDown.pending()) - liveCount; + if (requiredTransientCount > 0) + contacts.addAll(limit(filter(live.natural(), Replica::isTransient), requiredTransientCount)); + return contacts.asSnapshot(); + } + }; + + /** + * Construct the plan for a paxos round - NOT the write or read consistency level for either the write or comparison, + * but for the paxos linearisation agreement. + * + * This will select all live nodes as the candidates for the operation. Only the required number of participants + */ + public static ReplicaPlan.ForPaxosWrite forPaxos(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException + { + Token tk = key.getToken(); + ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWriteLiveAndDown(keyspace, tk); + + Replicas.temporaryAssertFull(liveAndDown.all()); // TODO CASSANDRA-14547 + + if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL) + { + // TODO: we should cleanup our semantics here, as we're filtering ALL nodes to localDC which is unexpected for ReplicaPlan + // Restrict natural and pending to node in the local DC only + String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + Predicate<Replica> isLocalDc = replica -> localDc.equals(snitch.getDatacenter(replica)); + + liveAndDown = liveAndDown.filter(isLocalDc); + } + + ReplicaLayout.ForTokenWrite live = liveAndDown.filter(FailureDetector.isReplicaAlive); + + // TODO: this should use assureSufficientReplicas + int participants = liveAndDown.all().size(); + int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833 + + EndpointsForToken contacts = live.all(); + if (contacts.size() < requiredParticipants) + throw UnavailableException.create(consistencyForPaxos, requiredParticipants, contacts.size()); + + // We cannot allow CAS operations with 2 or more pending endpoints, see #8346. + // Note that we fake an impossible number of required nodes in the unavailable exception + // to nail home the point that it's an impossible operation no matter how many nodes are live. + if (liveAndDown.pending().size() > 1) + throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", liveAndDown.all().size()), + consistencyForPaxos, + participants + 1, + contacts.size()); + + return new ReplicaPlan.ForPaxosWrite(keyspace, consistencyForPaxos, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts, requiredParticipants); + } + + /** + * Construct a plan for reading from a single node - this permits no speculation or read-repair + */ + public static ReplicaPlan.ForTokenRead forSingleReplicaRead(Keyspace keyspace, Token token, Replica replica) + { + EndpointsForToken one = EndpointsForToken.of(token, replica); + return new ReplicaPlan.ForTokenRead(keyspace, ConsistencyLevel.ONE, one, one); + } + + /** + * Construct a plan for reading from a single node - this permits no speculation or read-repair + */ + public static ReplicaPlan.ForRangeRead forSingleReplicaRead(Keyspace keyspace, AbstractBounds<PartitionPosition> range, Replica replica) + { + // TODO: this is unsafe, as one.range() may be inconsistent with our supplied range; should refactor Range/AbstractBounds to single class + EndpointsForRange one = EndpointsForRange.of(replica); + return new ReplicaPlan.ForRangeRead(keyspace, ConsistencyLevel.ONE, range, one, one); + } + + /** + * Construct a plan for reading the provided token at the provided consistency level. This translates to a collection of + * - candidates who are: alive, replicate the token, and are sorted by their snitch scores + * - contacts who are: the first blockFor + (retry == ALWAYS ? 1 : 0) candidates + * + * The candidate collection can be used for speculation, although at present it would break + * LOCAL_QUORUM and EACH_QUORUM to do so without further filtering + */ + public static ReplicaPlan.ForTokenRead forRead(Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry) + { + ReplicaLayout.ForTokenRead candidates = ReplicaLayout.forTokenReadLiveSorted(keyspace, token); + EndpointsForToken contacts = consistencyLevel.filterForQuery(keyspace, candidates.natural(), + retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE)); + + ReplicaPlan.ForTokenRead result = new ReplicaPlan.ForTokenRead(keyspace, consistencyLevel, candidates.natural(), contacts); + result.assureSufficientReplicas(); // Throw UAE early if we don't have enough replicas. + return result; + } + + /** + * Construct a plan for reading the provided range at the provided consistency level. This translates to a collection of + * - candidates who are: alive, replicate the range, and are sorted by their snitch scores + * - contacts who are: the first blockFor candidates + * + * There is no speculation for range read queries at present, so we never 'always speculate' here, and a failed response fails the query. + */ + public static ReplicaPlan.ForRangeRead forRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range) + { + ReplicaLayout.ForRangeRead candidates = ReplicaLayout.forRangeReadLiveSorted(keyspace, range); + EndpointsForRange contacts = consistencyLevel.filterForQuery(keyspace, candidates.natural()); + + ReplicaPlan.ForRangeRead result = new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, range, candidates.natural(), contacts); + result.assureSufficientReplicas(); + return result; + } + + /** + * Take two range read plans for adjacent ranges, and check if it is OK (and worthwhile) to combine them into a single plan + */ + public static ReplicaPlan.ForRangeRead maybeMerge(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaPlan.ForRangeRead left, ReplicaPlan.ForRangeRead right) + { + // TODO: should we be asserting that the ranges are adjacent? + AbstractBounds<PartitionPosition> newRange = left.range().withNewRight(right.range().right); + EndpointsForRange mergedCandidates = left.candidates().keep(right.candidates().endpoints()); + + // Check if there are enough shared endpoints for the merge to be possible. + if (!consistencyLevel.isSufficientLiveReplicasForRead(keyspace, mergedCandidates)) + return null; + + EndpointsForRange contacts = consistencyLevel.filterForQuery(keyspace, mergedCandidates); + + // Estimate whether merging will be a win or not + if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(contacts, left.contacts(), right.contacts())) + return null; + + // If we get there, merge this range and the next one + return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, newRange, mergedCandidates, contacts); + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/SystemReplicas.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/SystemReplicas.java b/src/java/org/apache/cassandra/locator/SystemReplicas.java index 13a9d74..0d1fc8d 100644 --- a/src/java/org/apache/cassandra/locator/SystemReplicas.java +++ b/src/java/org/apache/cassandra/locator/SystemReplicas.java @@ -18,12 +18,11 @@ package org.apache.cassandra.locator; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import com.google.common.collect.Collections2; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -50,13 +49,8 @@ public class SystemReplicas return systemReplicas.computeIfAbsent(endpoint, SystemReplicas::createSystemReplica); } - public static Collection<Replica> getSystemReplicas(Collection<InetAddressAndPort> endpoints) + public static EndpointsForRange getSystemReplicas(Collection<InetAddressAndPort> endpoints) { - List<Replica> replicas = new ArrayList<>(endpoints.size()); - for (InetAddressAndPort endpoint: endpoints) - { - replicas.add(getSystemReplica(endpoint)); - } - return replicas; + return EndpointsForRange.copyOf(Collections2.transform(endpoints, SystemReplicas::getSystemReplica)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index 4ab34db..ad40d7b 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -1224,15 +1224,11 @@ public class TokenMetadata /** * @deprecated retained for benefit of old tests */ + @Deprecated public EndpointsForToken getWriteEndpoints(Token token, String keyspaceName, EndpointsForToken natural) { EndpointsForToken pending = pendingEndpointsForToken(token, keyspaceName); - if (Endpoints.haveConflicts(natural, pending)) - { - natural = Endpoints.resolveConflictsInNatural(natural, pending); - pending = Endpoints.resolveConflictsInPending(natural, pending); - } - return Endpoints.concat(natural, pending); + return ReplicaLayout.forTokenWrite(natural, pending).all(); } /** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/net/IAsyncCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java b/src/java/org/apache/cassandra/net/IAsyncCallback.java index 253b412..ceaf072 100644 --- a/src/java/org/apache/cassandra/net/IAsyncCallback.java +++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java @@ -17,12 +17,6 @@ */ package org.apache.cassandra.net; -import com.google.common.base.Predicate; - -import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.Replica; - /** * implementors of IAsyncCallback need to make sure that any public methods * are threadsafe with respect to response() being called from the message @@ -31,10 +25,6 @@ import org.apache.cassandra.locator.Replica; */ public interface IAsyncCallback<T> { - final Predicate<InetAddressAndPort> isAlive = FailureDetector.instance::isAlive; - - final Predicate<Replica> isReplicaAlive = replica -> isAlive.apply(replica.endpoint()); - /** * @param msg response received. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index e817cc8..7f51ae7 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -26,8 +26,9 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.stream.Collectors; import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.ReplicaPlan; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +37,6 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.IMutation; import org.apache.cassandra.db.WriteType; import org.apache.cassandra.exceptions.RequestFailureReason; -import org.apache.cassandra.exceptions.UnavailableException; import org.apache.cassandra.exceptions.WriteFailureException; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.locator.InetAddressAndPort; @@ -53,7 +53,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW //Count down until all responses and expirations have occured before deciding whether the ideal CL was reached. private AtomicInteger responsesAndExpirations; private final SimpleCondition condition = new SimpleCondition(); - protected final ReplicaLayout.ForToken replicaLayout; + protected final ReplicaPlan.ForTokenWrite replicaPlan; protected final Runnable callback; protected final WriteType writeType; @@ -76,12 +76,12 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW * @param callback A callback to be called when the write is successful. * @param queryStartNanoTime */ - protected AbstractWriteResponseHandler(ReplicaLayout.ForToken replicaLayout, + protected AbstractWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, Runnable callback, WriteType writeType, long queryStartNanoTime) { - this.replicaLayout = replicaLayout; + this.replicaPlan = replicaPlan; this.callback = callback; this.writeType = writeType; this.failureReasonByEndpoint = new ConcurrentHashMap<>(); @@ -104,19 +104,19 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW if (!success) { - int blockedFor = totalBlockFor(); + int blockedFor = blockFor(); int acks = ackCount(); // It's pretty unlikely, but we can race between exiting await above and here, so // that we could now have enough acks. In that case, we "lie" on the acks count to // avoid sending confusing info to the user (see CASSANDRA-6491). if (acks >= blockedFor) acks = blockedFor - 1; - throw new WriteTimeoutException(writeType, replicaLayout.consistencyLevel(), acks, blockedFor); + throw new WriteTimeoutException(writeType, replicaPlan.consistencyLevel(), acks, blockedFor); } - if (totalBlockFor() + failures > totalEndpoints()) + if (blockFor() + failures > candidateReplicaCount()) { - throw new WriteFailureException(replicaLayout.consistencyLevel(), ackCount(), totalBlockFor(), writeType, failureReasonByEndpoint); + throw new WriteFailureException(replicaPlan.consistencyLevel(), ackCount(), blockFor(), writeType, failureReasonByEndpoint); } } @@ -135,7 +135,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW public void setIdealCLResponseHandler(AbstractWriteResponseHandler handler) { this.idealCLDelegate = handler; - idealCLDelegate.responsesAndExpirations = new AtomicInteger(replicaLayout.selected().size()); + idealCLDelegate.responsesAndExpirations = new AtomicInteger(replicaPlan.contacts().size()); } /** @@ -189,28 +189,30 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW /** * @return the minimum number of endpoints that must reply. */ - protected int totalBlockFor() + protected int blockFor() { // During bootstrap, we have to include the pending endpoints or we may fail the consistency level // guarantees (see #833) - return replicaLayout.consistencyLevel().blockForWrite(replicaLayout.keyspace(), replicaLayout.pending()); + return replicaPlan.blockFor(); } /** + * TODO: this method is brittle for its purpose of deciding when we should fail a query; + * this needs to be CL aware, and of which nodes are live/down * @return the total number of endpoints the request can been sent to. */ - protected int totalEndpoints() + protected int candidateReplicaCount() { - return replicaLayout.all().size(); + return replicaPlan.liveAndDown().size(); } public ConsistencyLevel consistencyLevel() { - return replicaLayout.consistencyLevel(); + return replicaPlan.consistencyLevel(); } /** - * @return true if the message counts towards the totalBlockFor() threshold + * @return true if the message counts towards the blockFor() threshold */ protected boolean waitingFor(InetAddressAndPort from) { @@ -227,11 +229,6 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW */ public abstract void response(MessageIn<T> msg); - public void assureSufficientLiveNodes() throws UnavailableException - { - replicaLayout.consistencyLevel().assureSufficientLiveNodesForWrite(replicaLayout.keyspace(), replicaLayout.all().filter(isReplicaAlive), replicaLayout.pending()); - } - protected void signal() { condition.signalAll(); @@ -250,7 +247,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW failureReasonByEndpoint.put(from, failureReason); - if (totalBlockFor() + n > totalEndpoints()) + if (blockFor() + n > candidateReplicaCount()) signal(); } @@ -278,11 +275,11 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW //The condition being signaled is a valid proxy for the CL being achieved if (!condition.isSignaled()) { - replicaLayout.keyspace().metric.writeFailedIdealCL.inc(); + replicaPlan.keyspace().metric.writeFailedIdealCL.inc(); } else { - replicaLayout.keyspace().metric.idealCLWriteLatency.addNano(System.nanoTime() - queryStartNanoTime); + replicaPlan.keyspace().metric.idealCLWriteLatency.addNano(System.nanoTime() - queryStartNanoTime); } } } @@ -292,7 +289,8 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW */ public void maybeTryAdditionalReplicas(IMutation mutation, StorageProxy.WritePerformer writePerformer, String localDC) { - if (replicaLayout.all().size() == replicaLayout.selected().size()) + EndpointsForToken uncontacted = replicaPlan.liveUncontacted(); + if (uncontacted.isEmpty()) return; long timeout = Long.MAX_VALUE; @@ -313,7 +311,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW for (ColumnFamilyStore cf : cfs) cf.metric.speculativeWrites.inc(); - writePerformer.apply(mutation, replicaLayout.forNaturalUncontacted(), + writePerformer.apply(mutation, replicaPlan.withContact(uncontacted), (AbstractWriteResponseHandler<IMutation>) this, localDC); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 8ffca6a..b32f67e 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -331,7 +331,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology(); Multimap<String, InetAddressAndPort> dcEndpointsMap = topology.getDatacenterEndpoints(); Iterable<InetAddressAndPort> dcEndpoints = concat(transform(dataCenters, dcEndpointsMap::get)); - return neighbors.keep(dcEndpoints); + return neighbors.select(dcEndpoints, true); } else if (hosts != null && !hosts.isEmpty()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java index ee74df5..63fbc72 100644 --- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java +++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java @@ -36,7 +36,7 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T> public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, int requiredBeforeFinish, BatchlogCleanup cleanup, long queryStartNanoTime) { - super(wrapped.replicaLayout, wrapped.callback, wrapped.writeType, queryStartNanoTime); + super(wrapped.replicaPlan, wrapped.callback, wrapped.writeType, queryStartNanoTime); this.wrapped = wrapped; this.requiredBeforeFinish = requiredBeforeFinish; this.cleanup = cleanup; @@ -64,24 +64,19 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T> wrapped.onFailure(from, failureReason); } - public void assureSufficientLiveNodes() - { - wrapped.assureSufficientLiveNodes(); - } - public void get() throws WriteTimeoutException, WriteFailureException { wrapped.get(); } - protected int totalBlockFor() + protected int blockFor() { - return wrapped.totalBlockFor(); + return wrapped.blockFor(); } - protected int totalEndpoints() + protected int candidateReplicaCount() { - return wrapped.totalEndpoints(); + return wrapped.candidateReplicaCount(); } protected boolean waitingFor(InetAddressAndPort from) http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java index d4cdcc6..4c892ff 100644 --- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java @@ -22,10 +22,10 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.WriteType; @@ -40,16 +40,16 @@ public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponse private final Map<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>(); private final AtomicInteger acks = new AtomicInteger(0); - public DatacenterSyncWriteResponseHandler(ReplicaLayout.ForToken replicaLayout, + public DatacenterSyncWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, Runnable callback, WriteType writeType, long queryStartNanoTime) { // Response is been managed by the map so make it 1 for the superclass. - super(replicaLayout, callback, writeType, queryStartNanoTime); - assert replicaLayout.consistencyLevel() == ConsistencyLevel.EACH_QUORUM; + super(replicaPlan, callback, writeType, queryStartNanoTime); + assert replicaPlan.consistencyLevel() == ConsistencyLevel.EACH_QUORUM; - NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) replicaLayout.keyspace().getReplicationStrategy(); + NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) replicaPlan.keyspace().getReplicationStrategy(); for (String dc : strategy.getDatacenters()) { @@ -59,7 +59,7 @@ public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponse // During bootstrap, we have to include the pending endpoints or we may fail the consistency level // guarantees (see #833) - for (Replica pending : replicaLayout.pending()) + for (Replica pending : replicaPlan.pending()) { responses.get(snitch.getDatacenter(pending)).incrementAndGet(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java index b458a71..a3ef76f 100644 --- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java @@ -19,7 +19,7 @@ package org.apache.cassandra.service; import org.apache.cassandra.db.WriteType; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.net.MessageIn; /** @@ -27,13 +27,13 @@ import org.apache.cassandra.net.MessageIn; */ public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T> { - public DatacenterWriteResponseHandler(ReplicaLayout.ForToken replicaLayout, + public DatacenterWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, Runnable callback, WriteType writeType, long queryStartNanoTime) { - super(replicaLayout, callback, writeType, queryStartNanoTime); - assert replicaLayout.consistencyLevel().isDatacenterLocal(); + super(replicaPlan, callback, writeType, queryStartNanoTime); + assert replicaPlan.consistencyLevel().isDatacenterLocal(); } @Override @@ -54,6 +54,6 @@ public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T> @Override protected boolean waitingFor(InetAddressAndPort from) { - return replicaLayout.consistencyLevel().isLocal(from); + return replicaPlan.consistencyLevel().isLocal(from); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 5eb43cf..fc49330 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -60,7 +60,6 @@ import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.view.ViewUtils; import org.apache.cassandra.dht.*; import org.apache.cassandra.exceptions.*; -import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.hints.Hint; import org.apache.cassandra.hints.HintsService; @@ -135,7 +134,7 @@ public class StorageProxy implements StorageProxyMBean standardWritePerformer = (mutation, targets, responseHandler, localDataCenter) -> { assert mutation instanceof Mutation; - sendToHintedReplicas((Mutation) mutation, targets.selected(), responseHandler, localDataCenter, Stage.MUTATION); + sendToHintedReplicas((Mutation) mutation, targets, responseHandler, localDataCenter, Stage.MUTATION); }; /* @@ -146,17 +145,17 @@ public class StorageProxy implements StorageProxyMBean */ counterWritePerformer = (mutation, targets, responseHandler, localDataCenter) -> { - EndpointsForToken selected = targets.selected().withoutSelf(); + EndpointsForToken selected = targets.contacts().withoutSelf(); Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548 - counterWriteTask(mutation, selected, responseHandler, localDataCenter).run(); + counterWriteTask(mutation, targets.withContact(selected), responseHandler, localDataCenter).run(); }; counterWriteOnCoordinatorPerformer = (mutation, targets, responseHandler, localDataCenter) -> { - EndpointsForToken selected = targets.selected().withoutSelf(); + EndpointsForToken selected = targets.contacts().withoutSelf(); Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548 StageManager.getStage(Stage.COUNTER_MUTATION) - .execute(counterWriteTask(mutation, selected, responseHandler, localDataCenter)); + .execute(counterWriteTask(mutation, targets.withContact(selected), responseHandler, localDataCenter)); }; for(ConsistencyLevel level : ConsistencyLevel.values()) @@ -232,9 +231,9 @@ public class StorageProxy implements StorageProxyMBean while (System.nanoTime() - queryStartNanoTime < timeout) { // for simplicity, we'll do a single liveness check at the start of each attempt - ReplicaLayout.ForPaxos replicaLayout = ReplicaLayout.forPaxos(Keyspace.open(keyspaceName), key, consistencyForPaxos); + ReplicaPlan.ForPaxosWrite replicaPlan = ReplicaPlans.forPaxos(Keyspace.open(keyspaceName), key, consistencyForPaxos); - final PaxosBallotAndContention pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, replicaLayout, consistencyForPaxos, consistencyForCommit, true, state); + final PaxosBallotAndContention pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, replicaPlan, consistencyForPaxos, consistencyForCommit, true, state); final UUID ballot = pair.ballot; contentions += pair.contentions; @@ -276,7 +275,7 @@ public class StorageProxy implements StorageProxyMBean Commit proposal = Commit.newProposal(ballot, updates); Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); - if (proposePaxos(proposal, replicaLayout, true, queryStartNanoTime)) + if (proposePaxos(proposal, replicaPlan, true, queryStartNanoTime)) { commitPaxos(proposal, consistencyForCommit, true, queryStartNanoTime); Tracing.trace("CAS successful"); @@ -334,7 +333,7 @@ public class StorageProxy implements StorageProxyMBean private static PaxosBallotAndContention beginAndRepairPaxos(long queryStartNanoTime, DecoratedKey key, TableMetadata metadata, - ReplicaLayout.ForPaxos replicaLayout, + ReplicaPlan.ForPaxosWrite paxosPlan, ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit, final boolean isWrite, @@ -360,7 +359,7 @@ public class StorageProxy implements StorageProxyMBean // prepare Tracing.trace("Preparing {}", ballot); Commit toPrepare = Commit.newPrepare(key, metadata, ballot); - summary = preparePaxos(toPrepare, replicaLayout, queryStartNanoTime); + summary = preparePaxos(toPrepare, paxosPlan, queryStartNanoTime); if (!summary.promised) { Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting"); @@ -383,7 +382,7 @@ public class StorageProxy implements StorageProxyMBean else casReadMetrics.unfinishedCommit.inc(); Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update); - if (proposePaxos(refreshedInProgress, replicaLayout, false, queryStartNanoTime)) + if (proposePaxos(refreshedInProgress, paxosPlan, false, queryStartNanoTime)) { try { @@ -440,12 +439,12 @@ public class StorageProxy implements StorageProxyMBean MessagingService.instance().sendOneWay(message, target); } - private static PrepareCallback preparePaxos(Commit toPrepare, ReplicaLayout.ForPaxos replicaLayout, long queryStartNanoTime) + private static PrepareCallback preparePaxos(Commit toPrepare, ReplicaPlan.ForPaxosWrite replicaPlan, long queryStartNanoTime) throws WriteTimeoutException { - PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), replicaLayout.getRequiredParticipants(), replicaLayout.consistencyLevel(), queryStartNanoTime); + PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), replicaPlan.requiredParticipants(), replicaPlan.consistencyLevel(), queryStartNanoTime); MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer); - for (Replica replica: replicaLayout.selected()) + for (Replica replica: replicaPlan.contacts()) { if (replica.isLocal()) { @@ -478,12 +477,12 @@ public class StorageProxy implements StorageProxyMBean return callback; } - private static boolean proposePaxos(Commit proposal, ReplicaLayout.ForPaxos replicaLayout, boolean timeoutIfPartial, long queryStartNanoTime) + private static boolean proposePaxos(Commit proposal, ReplicaPlan.ForPaxosWrite replicaPlan, boolean timeoutIfPartial, long queryStartNanoTime) throws WriteTimeoutException { - ProposeCallback callback = new ProposeCallback(replicaLayout.selected().size(), replicaLayout.getRequiredParticipants(), !timeoutIfPartial, replicaLayout.consistencyLevel(), queryStartNanoTime); + ProposeCallback callback = new ProposeCallback(replicaPlan.contacts().size(), replicaPlan.requiredParticipants(), !timeoutIfPartial, replicaPlan.consistencyLevel(), queryStartNanoTime); MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, Commit.serializer); - for (Replica replica : replicaLayout.selected()) + for (Replica replica : replicaPlan.contacts()) { if (replica.isLocal()) { @@ -518,7 +517,7 @@ public class StorageProxy implements StorageProxyMBean return true; if (timeoutIfPartial && !callback.isFullyRefused()) - throw new WriteTimeoutException(WriteType.CAS, replicaLayout.consistencyLevel(), callback.getAcceptCount(), replicaLayout.getRequiredParticipants()); + throw new WriteTimeoutException(WriteType.CAS, replicaPlan.consistencyLevel(), callback.getAcceptCount(), replicaPlan.requiredParticipants()); return false; } @@ -531,21 +530,22 @@ public class StorageProxy implements StorageProxyMBean Token tk = proposal.update.partitionKey().getToken(); AbstractWriteResponseHandler<Commit> responseHandler = null; - ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(keyspace, consistencyLevel, tk, FailureDetector.instance::isAlive); + // NOTE: this ReplicaPlan is a lie, this usage of ReplicaPlan could do with being clarified - the selected() collection is essentially (I think) never used + ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeAll); if (shouldBlock) { AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); - responseHandler = rs.getWriteResponseHandler(replicaLayout, null, WriteType.SIMPLE, queryStartNanoTime); + responseHandler = rs.getWriteResponseHandler(replicaPlan, null, WriteType.SIMPLE, queryStartNanoTime); responseHandler.setSupportsBackPressure(false); } - MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer); - for (Replica replica : replicaLayout.all()) + MessageOut<Commit> message = new MessageOut<>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer); + for (Replica replica : replicaPlan.liveAndDown()) { InetAddressAndPort destination = replica.endpoint(); checkHintOverload(replica); - if (FailureDetector.instance.isAlive(destination)) + if (replicaPlan.isAlive(replica)) { if (shouldBlock) { @@ -616,10 +616,10 @@ public class StorageProxy implements StorageProxyMBean * the data across to some other replica. * * @param mutations the mutations to be applied across the replicas - * @param consistency_level the consistency level for the operation + * @param consistencyLevel the consistency level for the operation * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed */ - public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level, long queryStartNanoTime) + public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException { Tracing.trace("Determining replicas for mutation"); @@ -637,7 +637,7 @@ public class StorageProxy implements StorageProxyMBean if (mutation instanceof CounterMutation) responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter, queryStartNanoTime)); else - responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null, plainWriteType, queryStartNanoTime)); + responseHandlers.add(performWrite(mutation, consistencyLevel, localDataCenter, standardWritePerformer, null, plainWriteType, queryStartNanoTime)); } // upgrade to full quorum any failed cheap quorums @@ -653,7 +653,7 @@ public class StorageProxy implements StorageProxyMBean } catch (WriteTimeoutException|WriteFailureException ex) { - if (consistency_level == ConsistencyLevel.ANY) + if (consistencyLevel == ConsistencyLevel.ANY) { hintMutations(mutations); } @@ -662,7 +662,7 @@ public class StorageProxy implements StorageProxyMBean if (ex instanceof WriteFailureException) { writeMetrics.failures.mark(); - writeMetricsMap.get(consistency_level).failures.mark(); + writeMetricsMap.get(consistencyLevel).failures.mark(); WriteFailureException fe = (WriteFailureException)ex; Tracing.trace("Write failure; received {} of {} required replies, failed {} requests", fe.received, fe.blockFor, fe.failureReasonByEndpoint.size()); @@ -670,7 +670,7 @@ public class StorageProxy implements StorageProxyMBean else { writeMetrics.timeouts.mark(); - writeMetricsMap.get(consistency_level).timeouts.mark(); + writeMetricsMap.get(consistencyLevel).timeouts.mark(); WriteTimeoutException te = (WriteTimeoutException)ex; Tracing.trace("Write timeout; received {} of {} required replies", te.received, te.blockFor); } @@ -680,14 +680,14 @@ public class StorageProxy implements StorageProxyMBean catch (UnavailableException e) { writeMetrics.unavailables.mark(); - writeMetricsMap.get(consistency_level).unavailables.mark(); + writeMetricsMap.get(consistencyLevel).unavailables.mark(); Tracing.trace("Unavailable"); throw e; } catch (OverloadedException e) { writeMetrics.unavailables.mark(); - writeMetricsMap.get(consistency_level).unavailables.mark(); + writeMetricsMap.get(consistencyLevel).unavailables.mark(); Tracing.trace("Overloaded"); throw e; } @@ -695,7 +695,7 @@ public class StorageProxy implements StorageProxyMBean { long latency = System.nanoTime() - startTime; writeMetrics.addNano(latency); - writeMetricsMap.get(consistency_level).addNano(latency); + writeMetricsMap.get(consistencyLevel).addNano(latency); updateCoordinatorWriteLatencyTableMetric(mutations, latency); } } @@ -725,7 +725,8 @@ public class StorageProxy implements StorageProxyMBean // local writes can timeout, but cannot be dropped (see LocalMutationRunnable and CASSANDRA-6510), // so there is no need to hint or retry. - EndpointsForToken replicasToHint = StorageService.instance.getNaturalAndPendingReplicasForToken(keyspaceName, token) + EndpointsForToken replicasToHint = ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token) + .all() .filter(StorageProxy::shouldHint); submitHint(mutation, replicasToHint, null); @@ -737,8 +738,8 @@ public class StorageProxy implements StorageProxyMBean Token token = mutation.key().getToken(); InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); - return StorageService.instance.getNaturalReplicasForToken(keyspaceName, token).endpoints().contains(local) - || StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, keyspaceName).endpoints().contains(local); + return ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token) + .all().endpoints().contains(local); } /** @@ -934,7 +935,6 @@ public class StorageProxy implements StorageProxyMBean cleanup, queryStartNanoTime); // exit early if we can't fulfill the CL at this time. - wrapper.handler.assureSufficientLiveNodes(); wrappers.add(wrapper); } @@ -1002,14 +1002,14 @@ public class StorageProxy implements StorageProxyMBean throws WriteTimeoutException, WriteFailureException { Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); - ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forBatchlogWrite(systemKeypsace, endpoints); - WriteResponseHandler<?> handler = new WriteResponseHandler(replicaLayout, + ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(systemKeypsace, endpoints); + WriteResponseHandler<?> handler = new WriteResponseHandler(replicaPlan, WriteType.BATCH_LOG, queryStartNanoTime); Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations); MessageOut<Batch> message = new MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer); - for (Replica replica : replicaLayout.all()) + for (Replica replica : replicaPlan.liveAndDown()) { logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, replica, batch.size()); @@ -1040,12 +1040,12 @@ public class StorageProxy implements StorageProxyMBean { for (WriteResponseHandlerWrapper wrapper : wrappers) { - Replicas.temporaryAssertFull(wrapper.handler.replicaLayout.all()); // TODO: CASSANDRA-14549 - ReplicaLayout.ForToken replicas = wrapper.handler.replicaLayout.withSelected(wrapper.handler.replicaLayout.all()); + Replicas.temporaryAssertFull(wrapper.handler.replicaPlan.liveAndDown()); // TODO: CASSANDRA-14549 + ReplicaPlan.ForTokenWrite replicas = wrapper.handler.replicaPlan.withContact(wrapper.handler.replicaPlan.liveAndDown()); try { - sendToHintedReplicas(wrapper.mutation, replicas.selected(), wrapper.handler, localDataCenter, stage); + sendToHintedReplicas(wrapper.mutation, replicas, wrapper.handler, localDataCenter, stage); } catch (OverloadedException | WriteTimeoutException e) { @@ -1059,11 +1059,11 @@ public class StorageProxy implements StorageProxyMBean { for (WriteResponseHandlerWrapper wrapper : wrappers) { - Replicas.temporaryAssertFull(wrapper.handler.replicaLayout.all()); // TODO: CASSANDRA-14549 - sendToHintedReplicas(wrapper.mutation, wrapper.handler.replicaLayout.all(), wrapper.handler, localDataCenter, stage); + EndpointsForToken sendTo = wrapper.handler.replicaPlan.liveAndDown(); + Replicas.temporaryAssertFull(sendTo); // TODO: CASSANDRA-14549 + sendToHintedReplicas(wrapper.mutation, wrapper.handler.replicaPlan.withContact(sendTo), wrapper.handler, localDataCenter, stage); } - for (WriteResponseHandlerWrapper wrapper : wrappers) wrapper.handler.get(); } @@ -1096,13 +1096,10 @@ public class StorageProxy implements StorageProxyMBean Token tk = mutation.key().getToken(); - ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, consistencyLevel, tk); - AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(replicaLayout, callback, writeType, queryStartNanoTime); + ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal); + AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(replicaPlan, callback, writeType, queryStartNanoTime); - // exit early if we can't fulfill the CL at this time - responseHandler.assureSufficientLiveNodes(); - - performer.apply(mutation, replicaLayout, responseHandler, localDataCenter); + performer.apply(mutation, replicaPlan, responseHandler, localDataCenter); return responseHandler; } @@ -1118,8 +1115,8 @@ public class StorageProxy implements StorageProxyMBean AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); Token tk = mutation.key().getToken(); - ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(keyspace, consistencyLevel, tk, FailureDetector.instance::isAlive); - AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaLayout,null, writeType, queryStartNanoTime); + ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal); + AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaPlan,null, writeType, queryStartNanoTime); BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime); return new WriteResponseHandlerWrapper(batchHandler, mutation); } @@ -1140,10 +1137,11 @@ public class StorageProxy implements StorageProxyMBean { Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); - Token tk = mutation.key().getToken(); - ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, consistencyLevel, tk, naturalEndpoints, pendingEndpoints); - AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaLayout, () -> { + ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(naturalEndpoints, pendingEndpoints); + ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, liveAndDown, ReplicaPlans.writeAll); + + AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaPlan, () -> { long delay = Math.max(0, System.currentTimeMillis() - baseComplete.get()); viewWriteMetrics.viewWriteLatency.update(delay, TimeUnit.MILLISECONDS); }, writeType, queryStartNanoTime); @@ -1208,7 +1206,7 @@ public class StorageProxy implements StorageProxyMBean * @throws OverloadedException if the hints cannot be written/enqueued */ public static void sendToHintedReplicas(final Mutation mutation, - EndpointsForToken targets, + ReplicaPlan.ForTokenWrite plan, AbstractWriteResponseHandler<IMutation> responseHandler, String localDataCenter, Stage stage) @@ -1227,11 +1225,11 @@ public class StorageProxy implements StorageProxyMBean List<InetAddressAndPort> backPressureHosts = null; - for (Replica destination : targets) + for (Replica destination : plan.contacts()) { checkHintOverload(destination); - if (FailureDetector.instance.isAlive(destination.endpoint())) + if (plan.isAlive(destination)) { if (destination.isLocal()) { @@ -1251,7 +1249,7 @@ public class StorageProxy implements StorageProxyMBean if (localDataCenter.equals(dc)) { if (localDc == null) - localDc = new ArrayList<>(targets.size()); + localDc = new ArrayList<>(plan.contacts().size()); localDc.add(destination); } @@ -1268,7 +1266,7 @@ public class StorageProxy implements StorageProxyMBean } if (backPressureHosts == null) - backPressureHosts = new ArrayList<>(targets.size()); + backPressureHosts = new ArrayList<>(plan.contacts().size()); backPressureHosts.add(destination.endpoint()); } @@ -1345,7 +1343,7 @@ public class StorageProxy implements StorageProxyMBean message, destination, message.getTimeout(), - handler.replicaLayout.consistencyLevel(), + handler.replicaPlan.consistencyLevel(), true); messageIds[idIdx++] = id; logger.trace("Adding FWD message to {}@{}", id, destination); @@ -1435,14 +1433,13 @@ public class StorageProxy implements StorageProxyMBean // Exit now if we can't fulfill the CL here instead of forwarding to the leader replica String keyspaceName = cm.getKeyspaceName(); Keyspace keyspace = Keyspace.open(keyspaceName); - AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy(); Token tk = cm.key().getToken(); - ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, cm.consistency(), tk); - rs.getWriteResponseHandler(replicaLayout, null, WriteType.COUNTER, queryStartNanoTime).assureSufficientLiveNodes(); + ReplicaPlans.forWrite(keyspace, cm.consistency(), tk, ReplicaPlans.writeAll) + .assureSufficientReplicas(); // Forward the actual update to the chosen leader replica - AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(ReplicaLayout.forCounterWrite(keyspace, tk, replica), + AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(ReplicaPlans.forForwardingCounterWrite(keyspace, tk, replica), WriteType.COUNTER, queryStartNanoTime); Tracing.trace("Enqueuing counter update to {}", replica); @@ -1465,7 +1462,7 @@ public class StorageProxy implements StorageProxyMBean { Keyspace keyspace = Keyspace.open(keyspaceName); IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - EndpointsForToken replicas = StorageService.instance.getLiveNaturalReplicasForToken(keyspace, key); + EndpointsForToken replicas = keyspace.getReplicationStrategy().getNaturalReplicasForToken(key); // CASSANDRA-13043: filter out those endpoints not accepting clients yet, maybe because still bootstrapping replicas = replicas.filter(replica -> StorageService.instance.isRpcReady(replica.endpoint())); @@ -1511,7 +1508,7 @@ public class StorageProxy implements StorageProxyMBean } private static Runnable counterWriteTask(final IMutation mutation, - final EndpointsForToken targets, + final ReplicaPlan.ForTokenWrite replicaPlan, final AbstractWriteResponseHandler<IMutation> responseHandler, final String localDataCenter) { @@ -1524,7 +1521,7 @@ public class StorageProxy implements StorageProxyMBean Mutation result = ((CounterMutation) mutation).applyCounterMutation(); responseHandler.response(null); - sendToHintedReplicas(result, targets, responseHandler, localDataCenter, Stage.COUNTER_MUTATION); + sendToHintedReplicas(result, replicaPlan, responseHandler, localDataCenter, Stage.COUNTER_MUTATION); } }; } @@ -1592,7 +1589,7 @@ public class StorageProxy implements StorageProxyMBean try { // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read - ReplicaLayout.ForPaxos replicaLayout = ReplicaLayout.forPaxos(Keyspace.open(metadata.keyspace), key, consistencyLevel); + ReplicaPlan.ForPaxosWrite replicaPlan = ReplicaPlans.forPaxos(Keyspace.open(metadata.keyspace), key, consistencyLevel); // does the work of applying in-progress writes; throws UAE or timeout if it can't final ConsistencyLevel consistencyForCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL @@ -1601,7 +1598,7 @@ public class StorageProxy implements StorageProxyMBean try { - final PaxosBallotAndContention pair = beginAndRepairPaxos(start, key, metadata, replicaLayout, consistencyLevel, consistencyForCommitOrFetch, false, state); + final PaxosBallotAndContention pair = beginAndRepairPaxos(start, key, metadata, replicaPlan, consistencyLevel, consistencyForCommitOrFetch, false, state); if (pair.contentions > 0) casReadMetrics.contention.update(pair.contentions); } @@ -1850,21 +1847,6 @@ public class StorageProxy implements StorageProxyMBean } } - public static EndpointsForToken getLiveSortedReplicasForToken(Keyspace keyspace, RingPosition pos) - { - return getLiveSortedReplicas(keyspace, pos).forToken(pos.getToken()); - } - - public static EndpointsForRange getLiveSortedReplicas(Keyspace keyspace, RingPosition pos) - { - EndpointsForRange liveReplicas = StorageService.instance.getLiveNaturalReplicas(keyspace, pos); - // Replica availability is considered by the query path - Preconditions.checkState(liveReplicas.isEmpty() || liveReplicas.stream().anyMatch(Replica::isFull), - "At least one full replica required for reads: " + liveReplicas); - - return DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), liveReplicas); - } - /** * Estimate the number of result rows per range in the ring based on our local data. * <p> @@ -1883,7 +1865,7 @@ public class StorageProxy implements StorageProxyMBean return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor().allReplicas; } - private static class RangeIterator extends AbstractIterator<ReplicaLayout.ForRange> + private static class RangeIterator extends AbstractIterator<ReplicaPlan.ForRangeRead> { private final Keyspace keyspace; private final ConsistencyLevel consistency; @@ -1907,43 +1889,34 @@ public class StorageProxy implements StorageProxyMBean return rangeCount; } - protected ReplicaLayout.ForRange computeNext() + protected ReplicaPlan.ForRangeRead computeNext() { if (!ranges.hasNext()) return endOfData(); - AbstractBounds<PartitionPosition> range = ranges.next(); - EndpointsForRange liveReplicas = getLiveSortedReplicas(keyspace, range.right); - - int blockFor = consistency.blockFor(keyspace); - EndpointsForRange targetReplicas = consistency.filterForQuery(keyspace, liveReplicas); - int minResponses = Math.min(targetReplicas.size(), blockFor); - - // Endpoints for range here as well - return ReplicaLayout.forRangeRead(keyspace, consistency, range, - liveReplicas, targetReplicas.subList(0, minResponses)); + return ReplicaPlans.forRangeRead(keyspace, consistency, ranges.next()); } } - private static class RangeMerger extends AbstractIterator<ReplicaLayout.ForRange> + private static class RangeMerger extends AbstractIterator<ReplicaPlan.ForRangeRead> { private final Keyspace keyspace; private final ConsistencyLevel consistency; - private final PeekingIterator<ReplicaLayout.ForRange> ranges; + private final PeekingIterator<ReplicaPlan.ForRangeRead> ranges; - private RangeMerger(Iterator<ReplicaLayout.ForRange> iterator, Keyspace keyspace, ConsistencyLevel consistency) + private RangeMerger(Iterator<ReplicaPlan.ForRangeRead> iterator, Keyspace keyspace, ConsistencyLevel consistency) { this.keyspace = keyspace; this.consistency = consistency; this.ranges = Iterators.peekingIterator(iterator); } - protected ReplicaLayout.ForRange computeNext() + protected ReplicaPlan.ForRangeRead computeNext() { if (!ranges.hasNext()) return endOfData(); - ReplicaLayout.ForRange current = ranges.next(); + ReplicaPlan.ForRangeRead current = ranges.next(); // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges @@ -1955,25 +1928,15 @@ public class StorageProxy implements StorageProxyMBean // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking // wire compatibility, so It's likely easier not to bother; - if (current.range.right.isMinimum()) + if (current.range().right.isMinimum()) break; - ReplicaLayout.ForRange next = ranges.peek(); - - EndpointsForRange merged = current.all().keep(next.all().endpoints()); - - // Check if there is enough endpoint for the merge to be possible. - if (!consistency.isSufficientLiveNodesForRead(keyspace, merged)) - break; - - EndpointsForRange filteredMerged = consistency.filterForQuery(keyspace, merged); - - // Estimate whether merging will be a win or not - if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, current.selected(), next.selected())) + ReplicaPlan.ForRangeRead next = ranges.peek(); + ReplicaPlan.ForRangeRead merged = ReplicaPlans.maybeMerge(keyspace, consistency, current, next); + if (merged == null) break; - // If we get there, merge this range and the next one - current = ReplicaLayout.forRangeRead(keyspace, consistency, current.range.withNewRight(next.range.right), merged, filteredMerged); + current = merged; ranges.next(); // consume the range we just merged since we've only peeked so far } return current; @@ -2018,7 +1981,7 @@ public class StorageProxy implements StorageProxyMBean private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator { - private final Iterator<ReplicaLayout.ForRange> ranges; + private final Iterator<ReplicaPlan.ForRangeRead> ranges; private final int totalRangeCount; private final PartitionRangeReadCommand command; private final boolean enforceStrictLiveness; @@ -2108,42 +2071,40 @@ public class StorageProxy implements StorageProxyMBean /** * Queries the provided sub-range. * - * @param replicaLayout the subRange to query. + * @param replicaPlan the subRange to query. * @param isFirst in the case where multiple queries are sent in parallel, whether that's the first query on * that batch or not. The reason it matters is that whe paging queries, the command (more specifically the * {@code DataLimits}) may have "state" information and that state may only be valid for the first query (in * that it's the query that "continues" whatever we're previously queried). */ - private SingleRangeResponse query(ReplicaLayout.ForRange replicaLayout, boolean isFirst) + private SingleRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan, boolean isFirst) { - PartitionRangeReadCommand rangeCommand = command.forSubRange(replicaLayout.range, isFirst); - ReadRepair<EndpointsForRange, ReplicaLayout.ForRange> readRepair = ReadRepair.create(command, replicaLayout, queryStartNanoTime); - DataResolver<EndpointsForRange, ReplicaLayout.ForRange> resolver = new DataResolver<>(rangeCommand, replicaLayout, readRepair, queryStartNanoTime); - Keyspace keyspace = Keyspace.open(command.metadata().keyspace); - - ReadCallback<EndpointsForRange, ReplicaLayout.ForRange> handler = new ReadCallback<>(resolver, - replicaLayout.consistencyLevel().blockFor(keyspace), - rangeCommand, - replicaLayout, - queryStartNanoTime); + PartitionRangeReadCommand rangeCommand = command.forSubRange(replicaPlan.range(), isFirst); + ReplicaPlan.SharedForRangeRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan); + ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair + = ReadRepair.create(command, sharedReplicaPlan, queryStartNanoTime); + DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver + = new DataResolver<>(rangeCommand, sharedReplicaPlan, readRepair, queryStartNanoTime); + ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler + = new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan, queryStartNanoTime); - handler.assureSufficientLiveNodes(); + replicaPlan.assureSufficientReplicas(); // If enabled, request repaired data tracking info from full replicas but // only if there are multiple full replicas to compare results from if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled() - && replicaLayout.selected().filter(Replica::isFull).size() > 1) + && replicaPlan.contacts().filter(Replica::isFull).size() > 1) { command.trackRepairedStatus(); } - if (replicaLayout.selected().size() == 1 && replicaLayout.selected().get(0).isLocal()) + if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isLocal()) { StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler)); } else { - for (Replica replica : replicaLayout.selected()) + for (Replica replica : replicaPlan.contacts()) { Tracing.trace("Enqueuing request to {}", replica); PartitionRangeReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery(); @@ -2486,7 +2447,7 @@ public class StorageProxy implements StorageProxyMBean public interface WritePerformer { public void apply(IMutation mutation, - ReplicaLayout.ForToken targets, + ReplicaPlan.ForTokenWrite targets, AbstractWriteResponseHandler<IMutation> responseHandler, String localDataCenter) throws OverloadedException; } @@ -2499,7 +2460,7 @@ public class StorageProxy implements StorageProxyMBean public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup, long queryStartNanoTime) { super(writeHandler, i, cleanup, queryStartNanoTime); - viewWriteMetrics.viewReplicasAttempted.inc(totalEndpoints()); + viewWriteMetrics.viewReplicasAttempted.inc(candidateReplicaCount()); } public void response(MessageIn<IMutation> msg) @@ -2705,7 +2666,7 @@ public class StorageProxy implements StorageProxyMBean HintsService.instance.write(hostIds, Hint.create(mutation, System.currentTimeMillis())); validTargets.forEach(HintsService.instance.metrics::incrCreatedHints); // Notify the handler only for CL == ANY - if (responseHandler != null && responseHandler.replicaLayout.consistencyLevel() == ConsistencyLevel.ANY) + if (responseHandler != null && responseHandler.replicaPlan.consistencyLevel() == ConsistencyLevel.ANY) responseHandler.response(null); } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 7f4ae14..a979f1c 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -172,7 +172,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public RangesAtEndpoint getLocalReplicas(String keyspaceName) { - return getReplicasForEndpoint(keyspaceName, FBUtilities.getBroadcastAddressAndPort()); + return Keyspace.open(keyspaceName).getReplicationStrategy() + .getAddressReplicas(FBUtilities.getBroadcastAddressAndPort()); } public List<Range<Token>> getLocalAndPendingRanges(String ks) @@ -2015,11 +2016,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE */ private EndpointsByRange constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges) { + AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy(); Map<Range<Token>, EndpointsForRange> rangeToEndpointMap = new HashMap<>(ranges.size()); for (Range<Token> range : ranges) - { - rangeToEndpointMap.put(range, Keyspace.open(keyspace).getReplicationStrategy().getNaturalReplicas(range.right)); - } + rangeToEndpointMap.put(range, strategy.getNaturalReplicas(range.right)); return new EndpointsByRange(rangeToEndpointMap); } @@ -3878,16 +3878,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } /** - * Get all ranges an endpoint is responsible for (by keyspace) - * @param ep endpoint we are interested in. - * @return ranges for the specified endpoint. - */ - RangesAtEndpoint getReplicasForEndpoint(String keyspaceName, InetAddressAndPort ep) - { - return Keyspace.open(keyspaceName).getReplicationStrategy().getAddressReplicas(ep); - } - - /** * Get all ranges that span the ring given a set * of tokens. All ranges are in sorted order of * ranges. @@ -3936,11 +3926,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return Replicas.stringify(getNaturalReplicasForToken(keyspaceName, cf, key), true); } - @Deprecated public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key) { - EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(key)); + EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, key); List<InetAddress> inetList = new ArrayList<>(replicas.size()); replicas.forEach(r -> inetList.add(r.endpoint().address)); return inetList; @@ -3948,7 +3937,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public List<String> getNaturalEndpointsWithPort(String keyspaceName, ByteBuffer key) { - return Replicas.stringify(getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(key)), true); + EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, key); + return Replicas.stringify(replicas, true); } public List<String> getReplicas(String keyspaceName, String cf, String key) @@ -3971,61 +3961,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (metadata == null) throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'"); - return getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key))); - } - - /** - * This method returns the N endpoints that are responsible for storing the - * specified key i.e for replication. - * - * @param keyspaceName keyspace name also known as keyspace - * @param pos position for which we need to find the endpoint - * @return the endpoint responsible for this token - */ - public static EndpointsForToken getNaturalReplicasForToken(String keyspaceName, RingPosition pos) - { - return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(pos); + return getNaturalReplicasForToken(keyspaceName, metadata.partitionKeyType.fromString(key)); } - /** - * Returns the endpoints currently responsible for storing the token plus pending ones - */ - public EndpointsForToken getNaturalAndPendingReplicasForToken(String keyspaceName, Token token) - { - // TODO: race condition to fetch these. impliciations?? - EndpointsForToken natural = getNaturalReplicasForToken(keyspaceName, token); - EndpointsForToken pending = tokenMetadata.pendingEndpointsForToken(token, keyspaceName); - if (Endpoints.haveConflicts(natural, pending)) - { - natural = Endpoints.resolveConflictsInNatural(natural, pending); - pending = Endpoints.resolveConflictsInPending(natural, pending); - } - return Endpoints.concat(natural, pending); - } - - /** - * This method attempts to return N endpoints that are responsible for storing the - * specified key i.e for replication. - * - * @param keyspace keyspace name also known as keyspace - * @param pos position for which we need to find the endpoint - */ - public EndpointsForToken getLiveNaturalReplicasForToken(Keyspace keyspace, RingPosition pos) - { - return getLiveNaturalReplicas(keyspace, pos).forToken(pos.getToken()); - } - - /** - * This method attempts to return N endpoints that are responsible for storing the - * specified key i.e for replication. - * - * @param keyspace keyspace name also known as keyspace - * @param pos position for which we need to find the endpoint - */ - public EndpointsForRange getLiveNaturalReplicas(Keyspace keyspace, RingPosition pos) + public EndpointsForToken getNaturalReplicasForToken(String keyspaceName, ByteBuffer key) { - EndpointsForRange replicas = keyspace.getReplicationStrategy().getNaturalReplicas(pos); - return replicas.filter(r -> FailureDetector.instance.isAlive(r.endpoint())); + Token token = tokenMetadata.partitioner.getToken(key); + return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(token); } public void setLoggingLevel(String classQualifier, String rawLevel) throws Exception @@ -4268,7 +4210,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE .filter(endpoint -> FailureDetector.instance.isAlive(endpoint) && !FBUtilities.getBroadcastAddressAndPort().equals(endpoint)) .collect(Collectors.toList()); - return EndpointsForRange.copyOf(SystemReplicas.getSystemReplicas(endpoints)); + return SystemReplicas.getSystemReplicas(endpoints); } /** * Find the best target to stream hints to. Currently the closest peer according to the snitch http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/WriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java index a07aae6..f9bfedf 100644 --- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java @@ -19,9 +19,7 @@ package org.apache.cassandra.service; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.ReplicaLayout; - +import org.apache.cassandra.locator.ReplicaPlan; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,18 +37,18 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T> private static final AtomicIntegerFieldUpdater<WriteResponseHandler> responsesUpdater = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "responses"); - public WriteResponseHandler(ReplicaLayout.ForToken replicaLayout, + public WriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, Runnable callback, WriteType writeType, long queryStartNanoTime) { - super(replicaLayout, callback, writeType, queryStartNanoTime); - responses = totalBlockFor(); + super(replicaPlan, callback, writeType, queryStartNanoTime); + responses = blockFor(); } - public WriteResponseHandler(ReplicaLayout.ForToken replicaLayout, WriteType writeType, long queryStartNanoTime) + public WriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, WriteType writeType, long queryStartNanoTime) { - this(replicaLayout, null, writeType, queryStartNanoTime); + this(replicaPlan, null, writeType, queryStartNanoTime); } public void response(MessageIn<T> m) @@ -65,7 +63,7 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T> protected int ackCount() { - return totalBlockFor() - responses; + return blockFor() - responses; } public boolean isLatencyForSnitch() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org