ReplicaPlan/Layout refactor follow-up/completion Finish much of the work to clarify endpoint selection that was begun in Transient Replication (CASSANDRA-14404)
Also fixes: - commitPaxos was incorrectly selecting only live nodes, when needed to include down - We were not writing to pending transient replicas - On write, we were not hinting to full nodes with transient replication - rr.maybeSendAdditional{Reads,Writes} would only consult the same node we may have speculated a read to - transient->full movements mishandled consistency level upgrade by retaining the 'full' pending variant, which increased CL requirement; instead, the 'natural' replica is upgraded to 'full' for writes patch by Benedict; reviewed by Alex Petrov and Ariel Weisberg for CASSANDRA-14705 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/047bcd7a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/047bcd7a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/047bcd7a Branch: refs/heads/trunk Commit: 047bcd7ad171d6a4aa89128c5e6c6ed5f012b1c0 Parents: 05dbb3e Author: Benedict Elliott Smith <bened...@apple.com> Authored: Fri Sep 7 11:41:28 2018 +0100 Committer: Benedict Elliott Smith <bened...@apple.com> Committed: Fri Sep 14 10:14:37 2018 +0100 ---------------------------------------------------------------------- .../cassandra/batchlog/BatchlogManager.java | 64 ++- .../apache/cassandra/db/ConsistencyLevel.java | 26 +- .../apache/cassandra/gms/FailureDetector.java | 4 + .../apache/cassandra/hints/HintsService.java | 4 +- .../locator/AbstractReplicaCollection.java | 67 +-- .../locator/AbstractReplicationStrategy.java | 20 +- .../org/apache/cassandra/locator/Endpoints.java | 59 +-- .../cassandra/locator/EndpointsForRange.java | 2 +- .../cassandra/locator/EndpointsForToken.java | 2 +- .../cassandra/locator/RangesAtEndpoint.java | 2 +- .../cassandra/locator/ReplicaCollection.java | 27 +- .../apache/cassandra/locator/ReplicaLayout.java | 435 ++++++++----------- .../apache/cassandra/locator/ReplicaPlan.java | 240 ++++++++++ .../apache/cassandra/locator/ReplicaPlans.java | 295 +++++++++++++ .../cassandra/locator/SystemReplicas.java | 12 +- .../apache/cassandra/locator/TokenMetadata.java | 8 +- .../apache/cassandra/net/IAsyncCallback.java | 10 - .../service/AbstractWriteResponseHandler.java | 50 +-- .../cassandra/service/ActiveRepairService.java | 2 +- .../service/BatchlogResponseHandler.java | 15 +- .../DatacenterSyncWriteResponseHandler.java | 12 +- .../service/DatacenterWriteResponseHandler.java | 10 +- .../apache/cassandra/service/StorageProxy.java | 239 +++++----- .../cassandra/service/StorageService.java | 82 +--- .../cassandra/service/WriteResponseHandler.java | 16 +- .../service/reads/AbstractReadExecutor.java | 78 ++-- .../cassandra/service/reads/DataResolver.java | 26 +- .../cassandra/service/reads/DigestResolver.java | 22 +- .../cassandra/service/reads/ReadCallback.java | 62 ++- .../service/reads/ResponseResolver.java | 22 +- .../reads/ShortReadPartitionsProtection.java | 19 +- .../reads/repair/AbstractReadRepair.java | 42 +- .../reads/repair/BlockingPartitionRepair.java | 29 +- .../reads/repair/BlockingReadRepair.java | 25 +- .../service/reads/repair/NoopReadRepair.java | 11 +- .../repair/PartitionIteratorMergeListener.java | 16 +- .../reads/repair/ReadOnlyReadRepair.java | 13 +- .../service/reads/repair/ReadRepair.java | 17 +- .../reads/repair/ReadRepairDiagnostics.java | 11 +- .../service/reads/repair/ReadRepairEvent.java | 2 +- .../reads/repair/ReadRepairStrategy.java | 11 +- .../reads/repair/RowIteratorMergeListener.java | 21 +- .../locator/ReplicaCollectionTest.java | 89 ++-- .../service/WriteResponseHandlerTest.java | 4 +- .../WriteResponseHandlerTransientTest.java | 71 ++- .../service/reads/DataResolverTest.java | 21 +- .../reads/DataResolverTransientTest.java | 227 ++++++++++ .../service/reads/DigestResolverTest.java | 5 +- .../service/reads/ReadExecutorTest.java | 7 +- .../reads/repair/AbstractReadRepairTest.java | 32 +- .../reads/repair/BlockingReadRepairTest.java | 36 +- .../DiagEventsBlockingReadRepairTest.java | 26 +- .../reads/repair/InstrumentedReadRepair.java | 4 +- .../reads/repair/ReadOnlyReadRepairTest.java | 24 +- .../service/reads/repair/ReadRepairTest.java | 12 +- .../reads/repair/TestableReadRepair.java | 18 +- 56 files changed, 1655 insertions(+), 1051 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java index 8dda54e..77f725c 100644 --- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java +++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java @@ -26,12 +26,9 @@ import java.util.concurrent.*; import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicates; import com.google.common.collect.*; import com.google.common.util.concurrent.RateLimiter; -import org.apache.cassandra.locator.ReplicaLayout; -import org.apache.cassandra.locator.EndpointsForToken; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +51,8 @@ import org.apache.cassandra.hints.Hint; import org.apache.cassandra.hints.HintsService; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -459,36 +458,34 @@ public class BatchlogManager implements BatchlogManagerMBean Keyspace keyspace = Keyspace.open(ks); Token tk = mutation.key().getToken(); - EndpointsForToken replicas = StorageService.instance.getNaturalAndPendingReplicasForToken(ks, tk); - Replicas.temporaryAssertFull(replicas); // TODO in CASSANDRA-14549 + // TODO: this logic could do with revisiting at some point, as it is unclear what its rationale is + // we perform a local write, ignoring errors and inline in this thread (potentially slowing replay down) + // effectively bumping CL for locally owned writes and also potentially stalling log replay if an error occurs + // once we decide how it should work, it can also probably be simplified, and avoid constructing a ReplicaPlan directly + ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWriteLiveAndDown(keyspace, tk); + Replicas.temporaryAssertFull(liveAndDown.all()); // TODO in CASSANDRA-14549 + + Replica selfReplica = liveAndDown.all().selfIfPresent(); + if (selfReplica != null) + mutation.apply(); - EndpointsForToken.Builder liveReplicasBuilder = EndpointsForToken.builder(tk); - for (Replica replica : replicas) + ReplicaLayout.ForTokenWrite liveRemoteOnly = liveAndDown.filter( + r -> FailureDetector.isReplicaAlive.test(r) && r != selfReplica); + + for (Replica replica : liveAndDown.all()) { - if (replica.isLocal()) - { - mutation.apply(); - } - else if (FailureDetector.instance.isAlive(replica.endpoint())) - { - liveReplicasBuilder.add(replica); // will try delivering directly instead of writing a hint. - } - else - { - hintedNodes.add(replica.endpoint()); - HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(replica.endpoint()), - Hint.create(mutation, writtenAt)); - } + if (replica == selfReplica || liveRemoteOnly.all().contains(replica)) + continue; + hintedNodes.add(replica.endpoint()); + HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(replica.endpoint()), + Hint.create(mutation, writtenAt)); } - EndpointsForToken liveReplicas = liveReplicasBuilder.build(); - if (liveReplicas.isEmpty()) - return null; - - Replicas.temporaryAssertFull(liveReplicas); - ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(keyspace, liveReplicas, System.nanoTime()); + ReplicaPlan.ForTokenWrite replicaPlan = new ReplicaPlan.ForTokenWrite(keyspace, ConsistencyLevel.ONE, + liveRemoteOnly.pending(), liveRemoteOnly.all(), liveRemoteOnly.all(), liveRemoteOnly.all()); + ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(replicaPlan, System.nanoTime()); MessageOut<Mutation> message = mutation.createMessage(); - for (Replica replica : liveReplicas) + for (Replica replica : liveRemoteOnly.all()) MessagingService.instance().sendWriteRR(message, replica, handler, false); return handler; } @@ -509,17 +506,16 @@ public class BatchlogManager implements BatchlogManagerMBean { private final Set<InetAddressAndPort> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>()); - ReplayWriteResponseHandler(Keyspace keyspace, EndpointsForToken writeReplicas, long queryStartNanoTime) + ReplayWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, long queryStartNanoTime) { - super(ReplicaLayout.forWriteWithDownNodes(keyspace, null, writeReplicas.token(), writeReplicas, EndpointsForToken.empty(writeReplicas.token())), - null, WriteType.UNLOGGED_BATCH, queryStartNanoTime); - Iterables.addAll(undelivered, writeReplicas.endpoints()); + super(replicaPlan, null, WriteType.UNLOGGED_BATCH, queryStartNanoTime); + Iterables.addAll(undelivered, replicaPlan.contacts().endpoints()); } @Override - protected int totalBlockFor() + protected int blockFor() { - return this.replicaLayout.selected().size(); + return this.replicaPlan.contacts().size(); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/db/ConsistencyLevel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java index 35ba198..5a4baf7 100644 --- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java +++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java @@ -251,16 +251,10 @@ public enum ConsistencyLevel if (this == EACH_QUORUM && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) return filterForEachQuorum(keyspace, liveReplicas); - /* - * Endpoints are expected to be restricted to live replicas, sorted by snitch preference. - * For LOCAL_QUORUM, move local-DC replicas in front first as we need them there whether - * we do read repair (since the first replica gets the data read) or not (since we'll take - * the blockFor first ones). - */ - if (isDCLocal) - liveReplicas = liveReplicas.sorted(DatabaseDescriptor.getLocalComparator()); - - return liveReplicas.subList(0, Math.min(liveReplicas.size(), blockFor(keyspace) + (alwaysSpeculate ? 1 : 0))); + int count = blockFor(keyspace) + (alwaysSpeculate ? 1 : 0); + return isDCLocal + ? liveReplicas.filter(ConsistencyLevel::isLocal, count) + : liveReplicas.subList(0, Math.min(liveReplicas.size(), count)); } private <E extends Endpoints<E>> E filterForEachQuorum(Keyspace keyspace, E liveReplicas) @@ -285,7 +279,7 @@ public enum ConsistencyLevel }); } - public boolean isSufficientLiveNodesForRead(Keyspace keyspace, Endpoints<?> liveReplicas) + public boolean isSufficientLiveReplicasForRead(Keyspace keyspace, Endpoints<?> liveReplicas) { switch (this) { @@ -316,15 +310,15 @@ public enum ConsistencyLevel } } - public void assureSufficientLiveNodesForRead(Keyspace keyspace, Endpoints<?> liveReplicas) throws UnavailableException + public void assureSufficientLiveReplicasForRead(Keyspace keyspace, Endpoints<?> liveReplicas) throws UnavailableException { - assureSufficientLiveNodes(keyspace, liveReplicas, blockFor(keyspace), 1); + assureSufficientLiveReplicas(keyspace, liveReplicas, blockFor(keyspace), 1); } - public void assureSufficientLiveNodesForWrite(Keyspace keyspace, Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException + public void assureSufficientLiveReplicasForWrite(Keyspace keyspace, Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException { - assureSufficientLiveNodes(keyspace, allLive, blockForWrite(keyspace, pendingWithDown), 0); + assureSufficientLiveReplicas(keyspace, allLive, blockForWrite(keyspace, pendingWithDown), 0); } - public void assureSufficientLiveNodes(Keyspace keyspace, Endpoints<?> allLive, int blockFor, int blockForFullReplicas) throws UnavailableException + void assureSufficientLiveReplicas(Keyspace keyspace, Endpoints<?> allLive, int blockFor, int blockForFullReplicas) throws UnavailableException { switch (this) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/gms/FailureDetector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java index e567b7b..d7f73ab 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetector.java +++ b/src/java/org/apache/cassandra/gms/FailureDetector.java @@ -27,11 +27,13 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.openmbean.CompositeData; import javax.management.openmbean.*; +import org.apache.cassandra.locator.Replica; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +73,8 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean } public static final IFailureDetector instance = new FailureDetector(); + public static final Predicate<InetAddressAndPort> isEndpointAlive = instance::isAlive; + public static final Predicate<Replica> isReplicaAlive = r -> isEndpointAlive.test(r.endpoint()); // this is useless except to provide backwards compatibility in phi_convict_threshold, // because everyone seems pretty accustomed to the default of 8, and users who have http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/hints/HintsService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java index c6ad3d9..73840d3 100644 --- a/src/java/org/apache/cassandra/hints/HintsService.java +++ b/src/java/org/apache/cassandra/hints/HintsService.java @@ -34,6 +34,8 @@ import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.locator.ReplicaLayout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -185,7 +187,7 @@ public final class HintsService implements HintsServiceMBean String keyspaceName = hint.mutation.getKeyspaceName(); Token token = hint.mutation.key().getToken(); - EndpointsForToken replicas = StorageService.instance.getNaturalAndPendingReplicasForToken(keyspaceName, token); + EndpointsForToken replicas = ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token).all(); // judicious use of streams: eagerly materializing probably cheaper // than performing filters / translations 2x extra via Iterables.filter/transform http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java index 6a7a4ff..94ff991 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java @@ -75,7 +75,6 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect */ public abstract Mutable<C> newMutable(int initialCapacity); - public C snapshot() { return isSnapshot ? self() @@ -83,6 +82,7 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect : new ArrayList<>(list)); } + /** see {@link ReplicaCollection#subList(int, int)}*/ public final C subList(int start, int end) { List<Replica> subList; @@ -100,11 +100,23 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect return snapshot(subList); } + /** see {@link ReplicaCollection#count(Predicate)}*/ + public int count(Predicate<Replica> predicate) + { + int count = 0; + for (int i = 0 ; i < list.size() ; ++i) + if (predicate.test(list.get(i))) + ++count; + return count; + } + + /** see {@link ReplicaCollection#filter(Predicate)}*/ public final C filter(Predicate<Replica> predicate) { return filter(predicate, Integer.MAX_VALUE); } + /** see {@link ReplicaCollection#filter(Predicate, int)}*/ public final C filter(Predicate<Replica> predicate, int limit) { if (isEmpty()) @@ -148,53 +160,8 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect return snapshot(copy); } - public final class Select - { - private final List<Replica> result; - public Select(int expectedSize) - { - this.result = new ArrayList<>(expectedSize); - } - - /** - * Add matching replica to the result; this predicate should be mutually exclusive with all prior predicates. - * Stop once we have targetSize replicas in total, including preceding calls - */ - public Select add(Predicate<Replica> predicate, int targetSize) - { - assert !Iterables.any(result, predicate::test); - for (int i = 0 ; result.size() < targetSize && i < list.size() ; ++i) - if (predicate.test(list.get(i))) - result.add(list.get(i)); - return this; - } - public Select add(Predicate<Replica> predicate) - { - return add(predicate, Integer.MAX_VALUE); - } - public C get() - { - return snapshot(result); - } - } - - /** - * An efficient method for selecting a subset of replica via a sequence of filters. - * - * Example: select().add(filter1).add(filter2, 3).get(); - * - * @return a Select object - */ - public final Select select() - { - return select(list.size()); - } - public final Select select(int expectedSize) - { - return new Select(expectedSize); - } - - public final C sorted(Comparator<Replica> comparator) + /** see {@link ReplicaCollection#sorted(Comparator)}*/ + public final C sorted(Comparator<Replica> comparator) { List<Replica> copy = new ArrayList<>(list); copy.sort(comparator); @@ -267,9 +234,9 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect if (replicas.isEmpty()) return extraReplicas; Mutable<C> mutable = replicas.newMutable(replicas.size() + extraReplicas.size()); - mutable.addAll(replicas); + mutable.addAll(replicas, Mutable.Conflict.NONE); mutable.addAll(extraReplicas, ignoreConflicts); - return mutable.asImmutableView(); + return mutable.asSnapshot(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index d168052..bad736f 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@ -141,33 +141,33 @@ public abstract class AbstractReplicationStrategy */ public abstract EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata); - public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout, + public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, Runnable callback, WriteType writeType, long queryStartNanoTime) { - return getWriteResponseHandler(replicaLayout, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel()); + return getWriteResponseHandler(replicaPlan, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel()); } - public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout, + public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, Runnable callback, WriteType writeType, long queryStartNanoTime, ConsistencyLevel idealConsistencyLevel) { AbstractWriteResponseHandler resultResponseHandler; - if (replicaLayout.consistencyLevel.isDatacenterLocal()) + if (replicaPlan.consistencyLevel().isDatacenterLocal()) { // block for in this context will be localnodes block. - resultResponseHandler = new DatacenterWriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime); + resultResponseHandler = new DatacenterWriteResponseHandler<T>(replicaPlan, callback, writeType, queryStartNanoTime); } - else if (replicaLayout.consistencyLevel == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy)) + else if (replicaPlan.consistencyLevel() == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy)) { - resultResponseHandler = new DatacenterSyncWriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime); + resultResponseHandler = new DatacenterSyncWriteResponseHandler<T>(replicaPlan, callback, writeType, queryStartNanoTime); } else { - resultResponseHandler = new WriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime); + resultResponseHandler = new WriteResponseHandler<T>(replicaPlan, callback, writeType, queryStartNanoTime); } //Check if tracking the ideal consistency level is configured @@ -176,14 +176,14 @@ public abstract class AbstractReplicationStrategy //If ideal and requested are the same just use this handler to track the ideal consistency level //This is also used so that the ideal consistency level handler when constructed knows it is the ideal //one for tracking purposes - if (idealConsistencyLevel == replicaLayout.consistencyLevel) + if (idealConsistencyLevel == replicaPlan.consistencyLevel()) { resultResponseHandler.setIdealCLResponseHandler(resultResponseHandler); } else { //Construct a delegate response handler to use to track the ideal consistency level - AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(replicaLayout.withConsistencyLevel(idealConsistencyLevel), + AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(replicaPlan.withConsistencyLevel(idealConsistencyLevel), callback, writeType, queryStartNanoTime, http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/Endpoints.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/Endpoints.java b/src/java/org/apache/cassandra/locator/Endpoints.java index 3d5faa4..28e578c 100644 --- a/src/java/org/apache/cassandra/locator/Endpoints.java +++ b/src/java/org/apache/cassandra/locator/Endpoints.java @@ -29,6 +29,11 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +/** + * A collection of Endpoints for a given ring position. This will typically reside in a ReplicaLayout, + * representing some subset of the endpoints for the Token or Range + * @param <E> The concrete type of Endpoints, that will be returned by the modifying methods + */ public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaCollection<E> { static final Map<InetAddressAndPort, Replica> EMPTY_MAP = Collections.unmodifiableMap(new LinkedHashMap<>()); @@ -89,17 +94,32 @@ public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaC return filter(r -> !self.equals(r.endpoint())); } + public Replica selfIfPresent() + { + InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort(); + return byEndpoint().get(self); + } + + /** + * @return a collection without the provided endpoints, otherwise in the same order as this collection + */ public E without(Set<InetAddressAndPort> remove) { return filter(r -> !remove.contains(r.endpoint())); } + /** + * @return a collection with only the provided endpoints (ignoring any not present), otherwise in the same order as this collection + */ public E keep(Set<InetAddressAndPort> keep) { return filter(r -> keep.contains(r.endpoint())); } - public E keep(Iterable<InetAddressAndPort> endpoints) + /** + * @return a collection containing the Replica from this collection for the provided endpoints, in the order of the provided endpoints + */ + public E select(Iterable<InetAddressAndPort> endpoints, boolean ignoreMissing) { ReplicaCollection.Mutable<E> copy = newMutable( endpoints instanceof Collection<?> @@ -109,10 +129,14 @@ public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaC Map<InetAddressAndPort, Replica> byEndpoint = byEndpoint(); for (InetAddressAndPort endpoint : endpoints) { - Replica keep = byEndpoint.get(endpoint); - if (keep == null) + Replica select = byEndpoint.get(endpoint); + if (select == null) + { + if (!ignoreMissing) + throw new IllegalArgumentException(endpoint + " is not present in " + this); continue; - copy.add(keep, ReplicaCollection.Mutable.Conflict.DUPLICATE); + } + copy.add(select, ReplicaCollection.Mutable.Conflict.DUPLICATE); } return copy.asSnapshot(); } @@ -124,34 +148,19 @@ public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaC * 2) because a movement that changes the type of replication from transient to full must be handled * differently for reads and writes (with the reader treating it as transient, and writer as full) * - * The method haveConflicts() below, and resolveConflictsInX, are used to detect and resolve any issues + * The method {@link ReplicaLayout#haveWriteConflicts} can be used to detect and resolve any issues */ public static <E extends Endpoints<E>> E concat(E natural, E pending) { return AbstractReplicaCollection.concat(natural, pending, Conflict.NONE); } - public static <E extends Endpoints<E>> boolean haveConflicts(E natural, E pending) - { - Set<InetAddressAndPort> naturalEndpoints = natural.endpoints(); - for (InetAddressAndPort pendingEndpoint : pending.endpoints()) - { - if (naturalEndpoints.contains(pendingEndpoint)) - return true; - } - return false; - } - - // must apply first - public static <E extends Endpoints<E>> E resolveConflictsInNatural(E natural, E pending) - { - return natural.filter(r -> !r.isTransient() || !pending.contains(r.endpoint(), true)); - } - - // must apply second - public static <E extends Endpoints<E>> E resolveConflictsInPending(E natural, E pending) + public static <E extends Endpoints<E>> E append(E replicas, Replica extraReplica) { - return pending.without(natural.endpoints()); + Mutable<E> mutable = replicas.newMutable(replicas.size() + 1); + mutable.addAll(replicas); + mutable.add(extraReplica, Conflict.NONE); + return mutable.asSnapshot(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/EndpointsForRange.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/EndpointsForRange.java b/src/java/org/apache/cassandra/locator/EndpointsForRange.java index c2d8232..f812951 100644 --- a/src/java/org/apache/cassandra/locator/EndpointsForRange.java +++ b/src/java/org/apache/cassandra/locator/EndpointsForRange.java @@ -86,7 +86,7 @@ public class EndpointsForRange extends Endpoints<EndpointsForRange> { boolean hasSnapshot; public Mutable(Range<Token> range) { this(range, 0); } - public Mutable(Range<Token> range, int capacity) { super(range, new ArrayList<>(capacity), false, new LinkedHashMap<>()); } + public Mutable(Range<Token> range, int capacity) { super(range, new ArrayList<>(capacity), false, new LinkedHashMap<>(capacity)); } public void add(Replica replica, Conflict ignoreConflict) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/EndpointsForToken.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/EndpointsForToken.java b/src/java/org/apache/cassandra/locator/EndpointsForToken.java index f24c615..3446dc9 100644 --- a/src/java/org/apache/cassandra/locator/EndpointsForToken.java +++ b/src/java/org/apache/cassandra/locator/EndpointsForToken.java @@ -77,7 +77,7 @@ public class EndpointsForToken extends Endpoints<EndpointsForToken> { boolean hasSnapshot; public Mutable(Token token) { this(token, 0); } - public Mutable(Token token, int capacity) { super(token, new ArrayList<>(capacity), false, new LinkedHashMap<>()); } + public Mutable(Token token, int capacity) { super(token, new ArrayList<>(capacity), false, new LinkedHashMap<>(capacity)); } public void add(Replica replica, Conflict ignoreConflict) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java index 74828ad..1773173 100644 --- a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java +++ b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java @@ -173,7 +173,7 @@ public class RangesAtEndpoint extends AbstractReplicaCollection<RangesAtEndpoint { boolean hasSnapshot; public Mutable(InetAddressAndPort endpoint) { this(endpoint, 0); } - public Mutable(InetAddressAndPort endpoint, int capacity) { super(endpoint, new ArrayList<>(capacity), false, new LinkedHashMap<>()); } + public Mutable(InetAddressAndPort endpoint, int capacity) { super(endpoint, new ArrayList<>(capacity), false, new LinkedHashMap<>(capacity)); } public void add(Replica replica, Conflict ignoreConflict) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaCollection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReplicaCollection.java b/src/java/org/apache/cassandra/locator/ReplicaCollection.java index 6833f4b..d1006dc 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaCollection.java +++ b/src/java/org/apache/cassandra/locator/ReplicaCollection.java @@ -62,6 +62,11 @@ public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Itera public abstract boolean contains(Replica replica); /** + * @return the number of replicas that match the predicate + */ + public abstract int count(Predicate<Replica> predicate); + + /** * @return a *eagerly constructed* copy of this collection containing the Replica that match the provided predicate. * An effort will be made to either return ourself, or a subList, where possible. * It is guaranteed that no changes to any upstream Mutable will affect the state of the result. @@ -108,16 +113,30 @@ public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Itera C asImmutableView(); /** - * @return an Immutable clone that assumes this Mutable will never be modified again. - * If this is not true, behaviour is undefined. + * @return an Immutable clone that assumes this Mutable will never be modified again, + * so its contents can be reused. + * + * This Mutable should enforce that it is no longer modified. */ C asSnapshot(); - enum Conflict { NONE, DUPLICATE, ALL} + /** + * Passed to add() and addAll() as ignoreConflicts parameter. The meaning of conflict varies by collection type + * (for Endpoints, it is a duplicate InetAddressAndPort; for RangesAtEndpoint it is a duplicate Range). + */ + enum Conflict + { + /** fail on addition of any such conflict */ + NONE, + /** fail on addition of any such conflict where the contents differ (first occurrence and position wins) */ + DUPLICATE, + /** ignore all conflicts (the first occurrence and position wins) */ + ALL + } /** * @param replica add this replica to the end of the collection - * @param ignoreConflict if false, fail on any conflicting additions (as defined by C's semantics) + * @param ignoreConflict conflicts to ignore, see {@link Conflict} */ void add(Replica replica, Conflict ignoreConflict); http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaLayout.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReplicaLayout.java b/src/java/org/apache/cassandra/locator/ReplicaLayout.java index 946a7f8..f48c989 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaLayout.java +++ b/src/java/org/apache/cassandra/locator/ReplicaLayout.java @@ -18,364 +18,299 @@ package org.apache.cassandra.locator; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.function.Predicate; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; - import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.UnavailableException; import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.net.IAsyncCallback; -import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy; -import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; import org.apache.cassandra.utils.FBUtilities; -import static com.google.common.collect.Iterables.any; +import java.util.Set; +import java.util.function.Predicate; /** - * Encapsulates knowledge about the ring necessary for performing a specific operation, with static accessors - * for building the relevant layout. + * The relevant replicas for an operation over a given range or token. * - * Constitutes: - * - the 'natural' replicas replicating the range or token relevant for the operation - * - if for performing a write, any 'pending' replicas that are taking ownership of the range, and must receive updates - * - the 'selected' replicas, those that should be targeted for any operation - * - 'all' replicas represents natural+pending - * - * @param <E> the type of Endpoints this ReplayLayout holds (either EndpointsForToken or EndpointsForRange) - * @param <L> the type of itself, including its type parameters, for return type of modifying methods + * @param <E> */ -public abstract class ReplicaLayout<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> +public abstract class ReplicaLayout<E extends Endpoints<E>> { - private volatile E all; - protected final E natural; - protected final E pending; - protected final E selected; - - protected final Keyspace keyspace; - protected final ConsistencyLevel consistencyLevel; - - private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected) - { - this(keyspace, consistencyLevel, natural, pending, selected, null); - } + private final E natural; - private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected, E all) + ReplicaLayout(E natural) { - assert selected != null; - assert pending == null || !Endpoints.haveConflicts(natural, pending); - this.keyspace = keyspace; - this.consistencyLevel = consistencyLevel; this.natural = natural; - this.pending = pending; - this.selected = selected; - // if we logically have no pending endpoints (they are null), then 'all' our endpoints are natural - if (all == null && pending == null) - all = natural; - this.all = all; } - public Replica getReplicaFor(InetAddressAndPort endpoint) - { - return natural.byEndpoint().get(endpoint); - } - - public E natural() + /** + * The 'natural' owners of the ring position(s), as implied by the current ring layout. + * This excludes any pending owners, i.e. those that are in the process of taking ownership of a range, but + * have not yet finished obtaining their view of the range. + */ + public final E natural() { return natural; } - public E all() - { - E result = all; - if (result == null) - all = result = Endpoints.concat(natural, pending); - return result; - } - - public E selected() - { - return selected; - } - /** - * @return the pending replicas - will be null for read layouts - * TODO: ideally we would enforce at compile time that read layouts have no pending to access + * All relevant owners of the ring position(s) for this operation, as implied by the current ring layout. + * For writes, this will include pending owners, and for reads it will be equivalent to natural() */ - public E pending() - { - return pending; - } - - public int blockFor() + public E all() { - return pending == null - ? consistencyLevel.blockFor(keyspace) - : consistencyLevel.blockForWrite(keyspace, pending); + return natural; } - public Keyspace keyspace() + public String toString() { - return keyspace; + return "ReplicaLayout [ natural: " + natural + " ]"; } - public ConsistencyLevel consistencyLevel() + public static class ForTokenRead extends ReplicaLayout<EndpointsForToken> implements ForToken { - return consistencyLevel; - } - - abstract public L withSelected(E replicas); - - abstract public L withConsistencyLevel(ConsistencyLevel cl); - - public L forNaturalUncontacted() - { - E more; - if (consistencyLevel.isDatacenterLocal() && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) + public ForTokenRead(EndpointsForToken natural) { - IEndpointSnitch snitch = keyspace.getReplicationStrategy().snitch; - String localDC = DatabaseDescriptor.getLocalDataCenter(); + super(natural); + } - more = natural.filter(replica -> !selected.contains(replica) && - snitch.getDatacenter(replica).equals(localDC)); - } else + @Override + public Token token() { - more = natural.filter(replica -> !selected.contains(replica)); + return natural().token(); } - return withSelected(more); + public ReplicaLayout.ForTokenRead filter(Predicate<Replica> filter) + { + EndpointsForToken filtered = natural().filter(filter); + // AbstractReplicaCollection.filter returns itself if all elements match the filter + if (filtered == natural()) return this; + return new ReplicaLayout.ForTokenRead(filtered); + } } - public static class ForRange extends ReplicaLayout<EndpointsForRange, ForRange> + public static class ForRangeRead extends ReplicaLayout<EndpointsForRange> implements ForRange { - public final AbstractBounds<PartitionPosition> range; + final AbstractBounds<PartitionPosition> range; - @VisibleForTesting - public ForRange(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange natural, EndpointsForRange selected) + public ForRangeRead(AbstractBounds<PartitionPosition> range, EndpointsForRange natural) { - // Range queries do not contact pending replicas - super(keyspace, consistencyLevel, natural, null, selected); + super(natural); this.range = range; } @Override - public ForRange withSelected(EndpointsForRange newSelected) + public AbstractBounds<PartitionPosition> range() { - return new ForRange(keyspace, consistencyLevel, range, natural, newSelected); + return range; } - @Override - public ForRange withConsistencyLevel(ConsistencyLevel cl) + public ReplicaLayout.ForRangeRead filter(Predicate<Replica> filter) { - return new ForRange(keyspace, cl, range, natural, selected); + EndpointsForRange filtered = natural().filter(filter); + // AbstractReplicaCollection.filter returns itself if all elements match the filter + if (filtered == natural()) return this; + return new ReplicaLayout.ForRangeRead(range(), filtered); } } - public static class ForToken extends ReplicaLayout<EndpointsForToken, ForToken> + public static class ForWrite<E extends Endpoints<E>> extends ReplicaLayout<E> { - public final Token token; + final E all; + final E pending; - @VisibleForTesting - public ForToken(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected) + ForWrite(E natural, E pending, E all) { - super(keyspace, consistencyLevel, natural, pending, selected); - this.token = token; + super(natural); + assert pending != null && !haveWriteConflicts(natural, pending); + if (all == null) + all = Endpoints.concat(natural, pending); + this.all = all; + this.pending = pending; } - public ForToken(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected, EndpointsForToken all) + public final E all() { - super(keyspace, consistencyLevel, natural, pending, selected, all); - this.token = token; + return all; } - public ForToken withSelected(EndpointsForToken newSelected) + public final E pending() { - return new ForToken(keyspace, consistencyLevel, token, natural, pending, newSelected); + return pending; } - @Override - public ForToken withConsistencyLevel(ConsistencyLevel cl) + public String toString() { - return new ForToken(keyspace, cl, token, natural, pending, selected); + return "ReplicaLayout [ natural: " + natural() + ", pending: " + pending + " ]"; } } - public static class ForPaxos extends ForToken + public static class ForTokenWrite extends ForWrite<EndpointsForToken> implements ForToken { - private final int requiredParticipants; - - private ForPaxos(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, int requiredParticipants, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected, EndpointsForToken all) + public ForTokenWrite(EndpointsForToken natural, EndpointsForToken pending) { - super(keyspace, consistencyLevel, token, natural, pending, selected, all); - this.requiredParticipants = requiredParticipants; + this(natural, pending, null); } - - public int getRequiredParticipants() + public ForTokenWrite(EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken all) { - return requiredParticipants; + super(natural, pending, all); } - } - public static ForToken forSingleReplica(Keyspace keyspace, Token token, Replica replica) - { - EndpointsForToken singleReplica = EndpointsForToken.of(token, replica); - return new ForToken(keyspace, ConsistencyLevel.ONE, token, singleReplica, EndpointsForToken.empty(token), singleReplica, singleReplica); - } - - public static ForRange forSingleReplica(Keyspace keyspace, AbstractBounds<PartitionPosition> range, Replica replica) - { - EndpointsForRange singleReplica = EndpointsForRange.of(replica); - return new ForRange(keyspace, ConsistencyLevel.ONE, range, singleReplica, singleReplica); - } + @Override + public Token token() { return natural().token(); } - public static ForToken forCounterWrite(Keyspace keyspace, Token token, Replica replica) - { - return forSingleReplica(keyspace, token, replica); + public ReplicaLayout.ForTokenWrite filter(Predicate<Replica> filter) + { + EndpointsForToken filtered = all().filter(filter); + // AbstractReplicaCollection.filter returns itself if all elements match the filter + if (filtered == all()) return this; + // unique by endpoint, so can for efficiency filter only on endpoint + return new ReplicaLayout.ForTokenWrite( + natural().keep(filtered.endpoints()), + pending().keep(filtered.endpoints()), + filtered + ); + } } - public static ForToken forBatchlogWrite(Keyspace keyspace, Collection<InetAddressAndPort> endpoints) throws UnavailableException + public interface ForRange { - // A single case we write not for range or token, but multiple mutations to many tokens - Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); - EndpointsForToken natural = EndpointsForToken.copyOf(token, SystemReplicas.getSystemReplicas(endpoints)); - EndpointsForToken pending = EndpointsForToken.empty(token); - ConsistencyLevel consistencyLevel = natural.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO; - - return forWriteWithDownNodes(keyspace, consistencyLevel, token, natural, pending); + public AbstractBounds<PartitionPosition> range(); } - public static ForToken forWriteWithDownNodes(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token) throws UnavailableException + public interface ForToken { - return forWrite(keyspace, consistencyLevel, token, Predicates.alwaysTrue()); + public Token token(); } - public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Predicate<InetAddressAndPort> isAlive) throws UnavailableException + /** + * Gets the 'natural' and 'pending' replicas that own a given token, with no filtering or processing. + * + * Since a write is intended for all nodes (except, unless necessary, transient replicas), this method's + * only responsibility is to fetch the 'natural' and 'pending' replicas, then resolve any conflicts + * {@link ReplicaLayout#haveWriteConflicts(Endpoints, Endpoints)} + */ + public static ReplicaLayout.ForTokenWrite forTokenWriteLiveAndDown(Keyspace keyspace, Token token) { - EndpointsForToken natural = StorageService.getNaturalReplicasForToken(keyspace.getName(), token); + // TODO: race condition to fetch these. implications?? + EndpointsForToken natural = keyspace.getReplicationStrategy().getNaturalReplicasForToken(token); EndpointsForToken pending = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, keyspace.getName()); - return forWrite(keyspace, consistencyLevel, token, natural, pending, isAlive); - } - - public static ForToken forWriteWithDownNodes(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending) throws UnavailableException - { - return forWrite(keyspace, consistencyLevel, token, natural, pending, Predicates.alwaysTrue()); + return forTokenWrite(natural, pending); } - public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, Predicate<InetAddressAndPort> isAlive) throws UnavailableException + public static ReplicaLayout.ForTokenWrite forTokenWrite(EndpointsForToken natural, EndpointsForToken pending) { - if (Endpoints.haveConflicts(natural, pending)) + if (haveWriteConflicts(natural, pending)) { - natural = Endpoints.resolveConflictsInNatural(natural, pending); - pending = Endpoints.resolveConflictsInPending(natural, pending); + natural = resolveWriteConflictsInNatural(natural, pending); + pending = resolveWriteConflictsInPending(natural, pending); } - - if (!any(natural, Replica::isTransient) && !any(pending, Replica::isTransient)) - { - EndpointsForToken selected = Endpoints.concat(natural, pending).filter(r -> isAlive.test(r.endpoint())); - return new ForToken(keyspace, consistencyLevel, token, natural, pending, selected); - } - - return forWrite(keyspace, consistencyLevel, token, consistencyLevel.blockForWrite(keyspace, pending), natural, pending, isAlive); + return new ReplicaLayout.ForTokenWrite(natural, pending); } - public static ReplicaLayout.ForPaxos forPaxos(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException + /** + * Detect if we have any endpoint in both pending and full; this can occur either due to races (there is no isolation) + * or because an endpoint is transitioning between full and transient replication status. + * + * We essentially always prefer the full version for writes, because this is stricter. + * + * For transient->full transitions: + * + * Since we always write to any pending transient replica, effectively upgrading it to full for the transition duration, + * it might at first seem to be OK to continue treating the conflict replica as its 'natural' transient form, + * as there is always a quorum of nodes receiving the write. However, ring ownership changes are not atomic or + * consistent across the cluster, and it is possible for writers to see different ring states. + * + * Furthermore, an operator would expect that the full node has received all writes, with no extra need for repair + * (as the normal contract dictates) when it completes its transition. + * + * While we cannot completely eliminate risks due to ring inconsistencies, this approach is the most conservative + * available to us today to mitigate, and (we think) the easiest to reason about. + * + * For full->transient transitions: + * + * In this case, things are dicier, because in theory we can trigger this change instantly. All we need to do is + * drop some data, surely? + * + * Ring movements can put us in a pickle; any other node could believe us to be full when we have become transient, + * and perform a full data request to us that we believe ourselves capable of answering, but that we are not. + * If the ring is inconsistent, it's even feasible that a transient request would be made to the node that is losing + * its transient status, that also does not know it has yet done so, resulting in all involved nodes being unaware + * of the data inconsistency. + * + * This happens because ring ownership changes are implied by a single node; not all owning nodes get a say in when + * the transition takes effect. As such, a node can hold an incorrect belief about its own ownership ranges. + * + * This race condition is somewhat inherent in present day Cassandra, and there's actually a limit to what we can do about it. + * It is a little more dangerous with transient replication, however, because we can completely answer a request without + * ever touching a digest, meaning we are less likely to attempt to repair any inconsistency. + * + * We aren't guaranteed to contact any different nodes for the data requests, of course, though we at least have a chance. + * + * Note: If we have any pending transient->full movement, we need to move the full replica to our 'natural' bucket + * to avoid corrupting our count. This is fine for writes, all we're doing is ensuring we always write to the node, + * instead of selectively. + * + * @param natural + * @param pending + * @param <E> + * @return + */ + static <E extends Endpoints<E>> boolean haveWriteConflicts(E natural, E pending) { - Token tk = key.getToken(); - EndpointsForToken natural = StorageService.getNaturalReplicasForToken(keyspace.getName(), tk); - EndpointsForToken pending = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(tk, keyspace.getName()); - if (Endpoints.haveConflicts(natural, pending)) + Set<InetAddressAndPort> naturalEndpoints = natural.endpoints(); + for (InetAddressAndPort pendingEndpoint : pending.endpoints()) { - natural = Endpoints.resolveConflictsInNatural(natural, pending); - pending = Endpoints.resolveConflictsInPending(natural, pending); + if (naturalEndpoints.contains(pendingEndpoint)) + return true; } - - // TODO CASSANDRA-14547 - Replicas.temporaryAssertFull(natural); - Replicas.temporaryAssertFull(pending); - - if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL) - { - // Restrict natural and pending to node in the local DC only - String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); - IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - Predicate<Replica> isLocalDc = replica -> localDc.equals(snitch.getDatacenter(replica)); - - natural = natural.filter(isLocalDc); - pending = pending.filter(isLocalDc); - } - - int participants = pending.size() + natural.size(); - int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833 - - EndpointsForToken all = Endpoints.concat(natural, pending); - EndpointsForToken selected = all.filter(IAsyncCallback.isReplicaAlive); - if (selected.size() < requiredParticipants) - throw UnavailableException.create(consistencyForPaxos, requiredParticipants, selected.size()); - - // We cannot allow CAS operations with 2 or more pending endpoints, see #8346. - // Note that we fake an impossible number of required nodes in the unavailable exception - // to nail home the point that it's an impossible operation no matter how many nodes are live. - if (pending.size() > 1) - throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", pending.size()), - consistencyForPaxos, - participants + 1, - selected.size()); - - return new ReplicaLayout.ForPaxos(keyspace, consistencyForPaxos, key.getToken(), requiredParticipants, natural, pending, selected, all); + return false; } /** - * We want to send mutations to as many full replicas as we can, and just as many transient replicas - * as we need to meet blockFor. + * MUST APPLY FIRST + * See {@link ReplicaLayout#haveWriteConflicts} + * @return a 'natural' replica collection, that has had its conflicts with pending repaired */ - @VisibleForTesting - public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, int blockFor, EndpointsForToken natural, EndpointsForToken pending, Predicate<InetAddressAndPort> livePredicate) throws UnavailableException + private static <E extends Endpoints<E>> E resolveWriteConflictsInNatural(E natural, E pending) { - EndpointsForToken all = Endpoints.concat(natural, pending); - EndpointsForToken selected = all - .select() - .add(r -> r.isFull() && livePredicate.test(r.endpoint())) - .add(r -> r.isTransient() && livePredicate.test(r.endpoint()), blockFor) - .get(); - - consistencyLevel.assureSufficientLiveNodesForWrite(keyspace, selected, pending); - - return new ForToken(keyspace, consistencyLevel, token, natural, pending, selected, all); + return natural.filter(r -> !r.isTransient() || !pending.contains(r.endpoint(), true)); } - public static ForToken forRead(Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry) + /** + * MUST APPLY SECOND + * See {@link ReplicaLayout#haveWriteConflicts} + * @return a 'pending' replica collection, that has had its conflicts with natural repaired + */ + private static <E extends Endpoints<E>> E resolveWriteConflictsInPending(E natural, E pending) { - EndpointsForToken natural = StorageProxy.getLiveSortedReplicasForToken(keyspace, token); - EndpointsForToken selected = consistencyLevel.filterForQuery(keyspace, natural, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE)); - - // Throw UAE early if we don't have enough replicas. - consistencyLevel.assureSufficientLiveNodesForRead(keyspace, selected); - - return new ForToken(keyspace, consistencyLevel, token, natural, null, selected); + return pending.without(natural.endpoints()); } - public static ForRange forRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange natural, EndpointsForRange selected) + /** + * @return the read layout for a token - this includes only live natural replicas, i.e. those that are not pending + * and not marked down by the failure detector. these are reverse sorted by the badness score of the configured snitch + */ + static ReplicaLayout.ForTokenRead forTokenReadLiveSorted(Keyspace keyspace, Token token) { - return new ForRange(keyspace, consistencyLevel, range, natural, selected); + EndpointsForToken replicas = keyspace.getReplicationStrategy().getNaturalReplicasForToken(token); + replicas = DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas); + replicas = replicas.filter(FailureDetector.isReplicaAlive); + return new ReplicaLayout.ForTokenRead(replicas); } - public String toString() + /** + * TODO: we should really double check that the provided range does not overlap multiple token ring regions + * @return the read layout for a range - this includes only live natural replicas, i.e. those that are not pending + * and not marked down by the failure detector. these are reverse sorted by the badness score of the configured snitch + */ + static ReplicaLayout.ForRangeRead forRangeReadLiveSorted(Keyspace keyspace, AbstractBounds<PartitionPosition> range) { - return "ReplicaLayout [ CL: " + consistencyLevel + " keyspace: " + keyspace + " natural: " + natural + "pending: " + pending + " selected: " + selected + " ]"; + EndpointsForRange replicas = keyspace.getReplicationStrategy().getNaturalReplicas(range.right); + replicas = DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas); + replicas = replicas.filter(FailureDetector.isReplicaAlive); + return new ReplicaLayout.ForRangeRead(range, replicas); } -} +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlan.java b/src/java/org/apache/cassandra/locator/ReplicaPlan.java new file mode 100644 index 0000000..4d6127b --- /dev/null +++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import com.google.common.collect.Iterables; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.dht.AbstractBounds; + +import java.util.function.Predicate; + +public abstract class ReplicaPlan<E extends Endpoints<E>> +{ + protected final Keyspace keyspace; + protected final ConsistencyLevel consistencyLevel; + + // all nodes we will contact via any mechanism, including hints + // i.e., for: + // - reads, only live natural replicas + // ==> live.natural().subList(0, blockFor + initial speculate) + // - writes, includes all full, and any pending replicas, (and only any necessary transient ones to make up the difference) + // ==> liveAndDown.natural().filter(isFull) ++ liveAndDown.pending() ++ live.natural.filter(isTransient, req) + // - paxos, includes all live replicas (natural+pending), for this DC if SERIAL_LOCAL + // ==> live.all() (if consistencyLevel.isDCLocal(), then .filter(consistencyLevel.isLocal)) + private final E contacts; + + ReplicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, E contacts) + { + assert contacts != null; + this.keyspace = keyspace; + this.consistencyLevel = consistencyLevel; + this.contacts = contacts; + } + + public abstract int blockFor(); + public abstract void assureSufficientReplicas(); + + public E contacts() { return contacts; } + public boolean contacts(Replica replica) { return contacts.contains(replica); } + public Keyspace keyspace() { return keyspace; } + public ConsistencyLevel consistencyLevel() { return consistencyLevel; } + + public static abstract class ForRead<E extends Endpoints<E>> extends ReplicaPlan<E> + { + // all nodes we *could* contacts; typically all natural replicas that are believed to be alive + // we will consult this collection to find uncontacted nodes we might contact if we doubt we will meet consistency level + private final E candidates; + + ForRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, E candidates, E contact) + { + super(keyspace, consistencyLevel, contact); + this.candidates = candidates; + } + + public int blockFor() { return consistencyLevel.blockFor(keyspace); } + public void assureSufficientReplicas() { consistencyLevel.assureSufficientLiveReplicasForRead(keyspace, candidates()); } + + public E candidates() { return candidates; } + + public E uncontactedCandidates() + { + return candidates().filter(r -> !contacts(r)); + } + + public Replica firstUncontactedCandidate(Predicate<Replica> extraPredicate) + { + return Iterables.tryFind(candidates(), r -> extraPredicate.test(r) && !contacts(r)).orNull(); + } + + public Replica getReplicaFor(InetAddressAndPort endpoint) + { + return candidates().byEndpoint().get(endpoint); + } + + public String toString() + { + return "ReplicaPlan.ForRead [ CL: " + consistencyLevel + " keyspace: " + keyspace + " candidates: " + candidates + " contacts: " + contacts() + " ]"; + } + } + + public static class ForTokenRead extends ForRead<EndpointsForToken> + { + public ForTokenRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken candidates, EndpointsForToken contact) + { + super(keyspace, consistencyLevel, candidates, contact); + } + + ForTokenRead withContact(EndpointsForToken newContact) + { + return new ForTokenRead(keyspace, consistencyLevel, candidates(), newContact); + } + } + + public static class ForRangeRead extends ForRead<EndpointsForRange> + { + final AbstractBounds<PartitionPosition> range; + + public ForRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact) + { + super(keyspace, consistencyLevel, candidates, contact); + this.range = range; + } + + public AbstractBounds<PartitionPosition> range() { return range; } + + ForRangeRead withContact(EndpointsForRange newContact) + { + return new ForRangeRead(keyspace, consistencyLevel, range, candidates(), newContact); + } + } + + public static abstract class ForWrite<E extends Endpoints<E>> extends ReplicaPlan<E> + { + // TODO: this is only needed because of poor isolation of concerns elsewhere - we can remove it soon, and will do so in a follow-up patch + final E pending; + final E liveAndDown; + final E live; + + ForWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, E pending, E liveAndDown, E live, E contact) + { + super(keyspace, consistencyLevel, contact); + this.pending = pending; + this.liveAndDown = liveAndDown; + this.live = live; + } + + public int blockFor() { return consistencyLevel.blockForWrite(keyspace, pending()); } + public void assureSufficientReplicas() { consistencyLevel.assureSufficientLiveReplicasForWrite(keyspace, live(), pending()); } + + /** Replicas that a region of the ring is moving to; not yet ready to serve reads, but should receive writes */ + public E pending() { return pending; } + /** Replicas that can participate in the write - this always includes all nodes (pending and natural) in all DCs, except for paxos LOCAL_QUORUM (which is local DC only) */ + public E liveAndDown() { return liveAndDown; } + /** The live replicas present in liveAndDown, usually derived from FailureDetector.isReplicaAlive */ + public E live() { return live; } + /** Calculate which live endpoints we could have contacted, but chose not to */ + public E liveUncontacted() { return live().filter(r -> !contacts(r)); } + /** Test liveness, consistent with the upfront analysis done for this operation (i.e. test membership of live()) */ + public boolean isAlive(Replica replica) { return live.endpoints().contains(replica.endpoint()); } + + public String toString() + { + return "ReplicaPlan.ForWrite [ CL: " + consistencyLevel + " keyspace: " + keyspace + " liveAndDown: " + liveAndDown + " live: " + live + " contacts: " + contacts() + " ]"; + } + } + + public static class ForTokenWrite extends ForWrite<EndpointsForToken> + { + public ForTokenWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken pending, EndpointsForToken liveAndDown, EndpointsForToken live, EndpointsForToken contact) + { + super(keyspace, consistencyLevel, pending, liveAndDown, live, contact); + } + + private ReplicaPlan.ForTokenWrite copy(ConsistencyLevel newConsistencyLevel, EndpointsForToken newContact) + { + return new ReplicaPlan.ForTokenWrite(keyspace, newConsistencyLevel, pending(), liveAndDown(), live(), newContact); + } + + ForTokenWrite withConsistencyLevel(ConsistencyLevel newConsistencylevel) { return copy(newConsistencylevel, contacts()); } + public ForTokenWrite withContact(EndpointsForToken newContact) { return copy(consistencyLevel, newContact); } + } + + public static class ForPaxosWrite extends ForWrite<EndpointsForToken> + { + final int requiredParticipants; + + ForPaxosWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken pending, EndpointsForToken liveAndDown, EndpointsForToken live, EndpointsForToken contact, int requiredParticipants) + { + super(keyspace, consistencyLevel, pending, liveAndDown, live, contact); + this.requiredParticipants = requiredParticipants; + } + + public int requiredParticipants() { return requiredParticipants; } + } + + /** + * Used by AbstractReadExecutor, {Data,Digest}Resolver and ReadRepair to share a ReplicaPlan whose 'contacts' replicas + * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write) + * + * The internal reference is not volatile, despite being shared between threads. The initial reference provided to + * the constructor should be visible by the normal process of sharing data between threads (i.e. executors, etc) + * and any updates will either be seen or not seen, perhaps not promptly, but certainly not incompletely. + * The contained ReplicaPlan has only final member properties, so it cannot be seen partially initialised. + */ + public interface Shared<E extends Endpoints<E>, P extends ReplicaPlan<E>> + { + /** + * add the provided replica to this shared plan, by updating the internal reference + */ + public void addToContacts(Replica replica); + /** + * get the shared replica plan, non-volatile (so maybe stale) but no risk of partially initialised + */ + public P get(); + /** + * get the shared replica plan, non-volatile (so maybe stale) but no risk of partially initialised, + * but replace its 'contacts' with those provided + */ + public abstract P getWithContacts(E endpoints); + } + + public static class SharedForTokenRead implements Shared<EndpointsForToken, ForTokenRead> + { + private ForTokenRead replicaPlan; + SharedForTokenRead(ForTokenRead replicaPlan) { this.replicaPlan = replicaPlan; } + public void addToContacts(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contacts(), replica)); } + public ForTokenRead get() { return replicaPlan; } + public ForTokenRead getWithContacts(EndpointsForToken newContact) { return replicaPlan.withContact(newContact); } + } + + public static class SharedForRangeRead implements Shared<EndpointsForRange, ForRangeRead> + { + private ForRangeRead replicaPlan; + SharedForRangeRead(ForRangeRead replicaPlan) { this.replicaPlan = replicaPlan; } + public void addToContacts(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contacts(), replica)); } + public ForRangeRead get() { return replicaPlan; } + public ForRangeRead getWithContacts(EndpointsForRange newContact) { return replicaPlan.withContact(newContact); } + } + + public static SharedForTokenRead shared(ForTokenRead replicaPlan) { return new SharedForTokenRead(replicaPlan); } + public static SharedForRangeRead shared(ForRangeRead replicaPlan) { return new SharedForRangeRead(replicaPlan); } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org