This is an automated email from the ASF dual-hosted git repository. slebresne pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push: new 2d0b168 Fix serial read/non-applying CAS linearizability 2d0b168 is described below commit 2d0b16804785660e8515aca9944784fb3733c619 Author: Sylvain Lebresne <lebre...@gmail.com> AuthorDate: Wed May 20 14:56:02 2020 +0200 Fix serial read/non-applying CAS linearizability Before this patch, a SERIAL read or a non-applying CAS replay any in-progress commit by calling `beginAndRepairPaxos`, but only a quorum of nodes is contacted, so a minority of nodes could have an unfinished in-progress proposal in their Paxos state. If such in-progress proposal is not replayed by a SERIAL read/non-applying CAS, it should never be replayed by any following operation as that would break serializability, but nothing was done to avoid this. This patch ensures that both a SERIAL read or a non-applying CAS commit an empty update before succeeding. This ensures that no prior incomplete in-progress proposal can be replayed (such proposal will be discarded as older than the last committed ballot). As this fix has a performance impact on SERIAL reads, a flag is provided to disable the new code (even if this is discouraged by a warning). Patch by Sylvain Lebresne, reviewed by Benjamin Lerer for CASSANDRA-12126 --- CHANGES.txt | 1 + NEWS.txt | 8 + .../org/apache/cassandra/service/StorageProxy.java | 231 +++++-- .../org/apache/cassandra/service/paxos/Commit.java | 6 + .../apache/cassandra/service/paxos/PaxosState.java | 3 - .../cassandra/service/paxos/PrepareCallback.java | 12 +- .../cassandra/distributed/impl/Instance.java | 112 ++-- .../apache/cassandra/distributed/test/CASTest.java | 679 +++++++++++++++++++++ 8 files changed, 939 insertions(+), 113 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 546fd98..d6f406d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.24: + * Fix serial read/non-applying CAS linearizability (CASSANDRA-12126) * Avoid potential NPE in JVMStabilityInspector (CASSANDRA-16294) * Improved check of num_tokens against the length of initial_token (CASSANDRA-14477) * Fix a race condition on ColumnFamilyStore and TableMetrics (CASSANDRA-16228) diff --git a/NEWS.txt b/NEWS.txt index 7034c2c..6cc5e84 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -47,6 +47,14 @@ using the provided 'sstableupgrade' tool. Upgrading --------- + - This release fix a correctness issue with SERIAL reads, and LWT writes that do not apply. + Unfortunately, this fix has a performance impact on read performance at the SERIAL or + LOCAL_SERIAL consistency levels. For heavy users of such SERIAL reads, the performance + impact may be noticeable and may also result in an increased of timeouts. For that + reason, a opt-in system property has been added to disable the fix: + -Dcassandra.unsafe.disable-serial-reads-linearizability=true + Use this flag at your own risk as it revert SERIAL reads to the incorrect behavior of + previous versions. See CASSANDRA-12126 for details. - In cassandra.yaml, when using vnodes num_tokens must be defined if initial_token is defined. If it is not defined, or not equal to the numbers of tokens defined in initial_tokens, the node will not start. See CASSANDRA-14477 for details. diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index c7888c4..91dd991 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -24,6 +24,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import com.google.common.base.Predicate; import com.google.common.annotations.VisibleForTesting; @@ -109,6 +110,10 @@ public class StorageProxy implements StorageProxyMBean */ private static final int MAX_CONCURRENT_RANGE_REQUESTS = Math.max(1, Integer.getInteger("cassandra.max_concurrent_range_requests", FBUtilities.getAvailableProcessors() * 10)); + private static final String DISABLE_SERIAL_READ_LINEARIZABILITY_KEY = "cassandra.unsafe.disable-serial-reads-linearizability"; + private static final boolean disableSerialReadLinearizability = + Boolean.parseBoolean(System.getProperty(DISABLE_SERIAL_READ_LINEARIZABILITY_KEY, "false")); + private StorageProxy() { } @@ -163,6 +168,16 @@ public class StorageProxy implements StorageProxyMBean .execute(counterWriteTask(mutation, targets, responseHandler, localDataCenter)); } }; + + if (disableSerialReadLinearizability) + { + logger.warn("This node was started with -D{}. SERIAL (and LOCAL_SERIAL) reads coordinated by this node " + + "will not offer linearizability (see CASSANDRA-12126 for details on what this mean) with " + + "respect to other SERIAL operations. Please note that, with this flag, SERIAL reads will be " + + "slower than QUORUM reads, yet offer no more guarantee. This flag should only be used in " + + "the restricted case of upgrading from a pre-CASSANDRA-12126 version, and only if you " + + "understand the tradeoff.", DISABLE_SERIAL_READ_LINEARIZABILITY_KEY); + } } /** @@ -216,26 +231,12 @@ public class StorageProxy implements StorageProxyMBean throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException { final long start = System.nanoTime(); - int contentions = 0; try { - consistencyForPaxos.validateForCas(); - consistencyForCommit.validateForCasCommit(keyspaceName); - CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); - long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); - while (System.nanoTime() - start < timeout) + Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer = () -> { - // for simplicity, we'll do a single liveness check at the start of each attempt - Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos); - List<InetAddress> liveEndpoints = p.left; - int requiredParticipants = p.right; - - final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state); - final UUID ballot = pair.left; - contentions += pair.right; - // read the current values and check they validate the conditions Tracing.trace("Reading existing values for CAS precondition"); SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds()); @@ -251,11 +252,10 @@ public class StorageProxy implements StorageProxyMBean { Tracing.trace("CAS precondition does not match current values {}", current); casWriteMetrics.conditionNotMet.inc(); - return current.rowIterator(); + return Pair.create(PartitionUpdate.emptyUpdate(metadata, key), current.rowIterator()); } - // finish the paxos round w/ the desired updates - // TODO turn null updates into delete? + // Create the desired updates PartitionUpdate updates = request.makeUpdates(current); // Apply triggers to cas updates. A consideration here is that @@ -267,47 +267,141 @@ public class StorageProxy implements StorageProxyMBean // InvalidRequestException) any which aren't. updates = TriggerExecutor.instance.execute(updates); + return Pair.create(updates, null); + }; - Commit proposal = Commit.newProposal(ballot, updates); - Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); - if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos)) - { - commitPaxos(proposal, consistencyForCommit, true); - Tracing.trace("CAS successful"); - return null; - } + return doPaxos(metadata, + key, + consistencyForPaxos, + consistencyForCommit, + consistencyForCommit, + state, + start, + casWriteMetrics, + updateProposer); - Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)"); - contentions++; - Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), TimeUnit.MILLISECONDS); - // continue to retry - } - - throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(keyspaceName))); } - catch (WriteTimeoutException|ReadTimeoutException e) + catch (WriteTimeoutException | ReadTimeoutException e) { casWriteMetrics.timeouts.mark(); throw e; } - catch (WriteFailureException|ReadFailureException e) + catch (WriteFailureException | ReadFailureException e) { casWriteMetrics.failures.mark(); throw e; } - catch(UnavailableException e) + catch (UnavailableException e) { casWriteMetrics.unavailables.mark(); throw e; } finally { - if(contentions > 0) - casWriteMetrics.contention.update(contentions); casWriteMetrics.addNano(System.nanoTime() - start); } } + /** + * Performs the Paxos rounds for a given proposal, retrying when preempted until the timeout. + * + * <p>The main 'configurable' of this method is the {@code createUpdateProposal} method: it is called by the method + * once a ballot has been successfully 'prepared' to generate the update to 'propose' (and commit if the proposal is + * successful). That method also generates the result that the whole method will return. Note that due to retrying, + * this method may be called multiple times and does not have to return the same results. + * + * @param metadata the table to update with Paxos. + * @param key the partition updated. + * @param consistencyForPaxos the serial consistency of the operation (either {@link ConsistencyLevel#SERIAL} or + * {@link ConsistencyLevel#LOCAL_SERIAL}). + * @param consistencyForReplayCommits the consistency for the commit phase of "replayed" in-progress operations. + * @param consistencyForCommit the consistency for the commit phase of _this_ operation update. + * @param state the client state. + * @param queryStartNanoTime the nano time for the start of the query this is part of. + * @param casMetrics the metrics to update for this operation. + * @param createUpdateProposal method called after a successful 'prepare' phase to obtain 1) the actual update of + * this operation and 2) the result that the whole method should return. This can return {@code null} in the + * special where, after having "prepared" (and thus potentially replayed in-progress upgdates), we don't want + * to propose anything (the whole method then return {@code null}). + * @return the second element of the pair returned by {@code createUpdateProposal} (for the last call of that method + * if that method is called multiple times due to retries). + */ + private static RowIterator doPaxos(CFMetaData metadata, + DecoratedKey key, + ConsistencyLevel consistencyForPaxos, + ConsistencyLevel consistencyForReplayCommits, + ConsistencyLevel consistencyForCommit, + ClientState state, + long queryStartNanoTime, + CASClientRequestMetrics casMetrics, + Supplier<Pair<PartitionUpdate, RowIterator>> createUpdateProposal) + throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException + { + int contentions = 0; + try + { + consistencyForPaxos.validateForCas(); + consistencyForReplayCommits.validateForCasCommit(metadata.ksName); + consistencyForCommit.validateForCasCommit(metadata.ksName); + + long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); + while (System.nanoTime() - queryStartNanoTime < timeout) + { + // for simplicity, we'll do a single liveness check at the start of each attempt + Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos); + List<InetAddress> liveEndpoints = p.left; + int requiredParticipants = p.right; + + final Pair<UUID, Integer> pair = beginAndRepairPaxos(queryStartNanoTime, + key, + metadata, + liveEndpoints, + requiredParticipants, + consistencyForPaxos, + consistencyForReplayCommits, + casMetrics, + state); + final UUID ballot = pair.left; + contentions += pair.right; + + Pair<PartitionUpdate, RowIterator> proposalPair = createUpdateProposal.get(); + // See method javadoc: null here is code for "stop here and return null". + if (proposalPair == null) + return null; + + Commit proposal = Commit.newProposal(ballot, proposalPair.left); + Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); + if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos)) + { + // We skip committing accepted updates when they are empty. This is an optimization which works + // because we also skip replaying those same empty update in beginAndRepairPaxos (see the longer + // comment there). As empty update are somewhat common (serial reads and non-applying CAS propose + // them), this is worth bothering. + if (!proposal.update.isEmpty()) + commitPaxos(proposal, consistencyForCommit, true); + RowIterator result = proposalPair.right; + if (result != null) + Tracing.trace("CAS did not apply"); + else + Tracing.trace("CAS applied successfully"); + return result; + } + + Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)"); + contentions++; + Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), TimeUnit.MILLISECONDS); + // continue to retry + } + + throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(metadata.ksName))); + } + finally + { + if(contentions > 0) + casMetrics.contention.update(contentions); + } + } + private static Predicate<InetAddress> sameDCPredicateFor(final String dc) { final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); @@ -364,7 +458,7 @@ public class StorageProxy implements StorageProxyMBean int requiredParticipants, ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit, - final boolean isWrite, + CASClientRequestMetrics casMetrics, ClientState state) throws WriteTimeoutException, WriteFailureException { @@ -397,18 +491,31 @@ public class StorageProxy implements StorageProxyMBean continue; } - Commit inProgress = summary.mostRecentInProgressCommitWithUpdate; + Commit inProgress = summary.mostRecentInProgressCommit; Commit mostRecent = summary.mostRecentCommit; // If we have an in-progress ballot greater than the MRC we know, then it's an in-progress round that // needs to be completed, so do it. + // One special case we make is for update that are empty (which are proposed by serial reads and + // non-applying CAS). While we could handle those as any other updates, we can optimize this somewhat by + // neither committing those empty updates, nor replaying in-progress ones. The reasoning is this: as the + // update is empty, we have nothing to apply to storage in the commit phase, so the only reason to commit + // would be to update the MRC. However, if we skip replaying those empty updates, then we don't need to + // update the MRC for following updates to make progress (that is, if we didn't had the empty update skip + // below _but_ skipped updating the MRC on empty updates, then we'd be stuck always proposing that same + // empty update). And the reason skipping that replay is safe is that when an operation tries to propose + // an empty value, there can be only 2 cases: + // 1) the propose succeed, meaning a quorum of nodes accept it, in which case we are guaranteed no earlier + // pending operation can ever be replayed (which is what we want to guarantee with the empty update). + // 2) the propose does not succeed. But then the operation proposing the empty update will not succeed + // either (it will retry or ultimately timeout), and we're actually ok if earlier pending operation gets + // replayed in that case. + // Tl;dr, it is safe to skip committing empty updates _as long as_ we also skip replying them below. And + // doing is more efficient, so we do so. if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent)) { Tracing.trace("Finishing incomplete paxos round {}", inProgress); - if(isWrite) - casWriteMetrics.unfinishedCommit.inc(); - else - casReadMetrics.unfinishedCommit.inc(); + casMetrics.unfinishedCommit.inc(); Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update); if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos)) { @@ -1535,21 +1642,31 @@ public class StorageProxy implements StorageProxyMBean PartitionIterator result = null; try { - // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read - Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyLevel); - List<InetAddress> liveEndpoints = p.left; - int requiredParticipants = p.right; - - // does the work of applying in-progress writes; throws UAE or timeout if it can't - final ConsistencyLevel consistencyForCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL - ? ConsistencyLevel.LOCAL_QUORUM - : ConsistencyLevel.QUORUM; + final ConsistencyLevel consistencyForReplayCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL + ? ConsistencyLevel.LOCAL_QUORUM + : ConsistencyLevel.QUORUM; try { - final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyLevel, consistencyForCommitOrFetch, false, state); - if (pair.right > 0) - casReadMetrics.contention.update(pair.right); + // Commit an empty update to make sure all in-progress updates that should be finished first is, _and_ + // that no other in-progress can get resurrected. + Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer = + disableSerialReadLinearizability + ? () -> null + : () -> Pair.create(PartitionUpdate.emptyUpdate(metadata, key), null); + // When replaying, we commit at quorum/local quorum, as we want to be sure the following read (done at + // quorum/local_quorum) sees any replayed updates. Our own update is however empty, and those don't even + // get committed due to an optimiation described in doPaxos/beingRepairAndPaxos, so the commit + // consistency is irrelevant (we use ANY just to emphasis that we don't wait on our commit). + doPaxos(metadata, + key, + consistencyLevel, + consistencyForReplayCommitOrFetch, + ConsistencyLevel.ANY, + state, + start, + casReadMetrics, + updateProposer); } catch (WriteTimeoutException e) { @@ -1560,7 +1677,7 @@ public class StorageProxy implements StorageProxyMBean throw new ReadFailureException(consistencyLevel, e.received, e.failures, e.blockFor, false); } - result = fetchRows(group.commands, consistencyForCommitOrFetch); + result = fetchRows(group.commands, consistencyForReplayCommitOrFetch); } catch (UnavailableException e) { diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java index 95bd464..a3f491b 100644 --- a/src/java/org/apache/cassandra/service/paxos/Commit.java +++ b/src/java/org/apache/cassandra/service/paxos/Commit.java @@ -82,6 +82,12 @@ public class Commit return this.ballot.equals(ballot); } + /** Whether this is an empty commit, that is one with no updates. */ + public boolean isEmpty() + { + return update.isEmpty(); + } + public Mutation makeMutation() { return new Mutation(update); diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java index ee1ba6a..8ab9a98 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java @@ -20,12 +20,9 @@ */ package org.apache.cassandra.service.paxos; -import java.util.concurrent.ExecutionException; import java.util.concurrent.locks.Lock; -import com.google.common.base.Throwables; import com.google.common.util.concurrent.Striped; -import com.google.common.util.concurrent.Uninterruptibles; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java index ff81803..26e292e 100644 --- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java +++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java @@ -46,7 +46,6 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> public boolean promised = true; public Commit mostRecentCommit; public Commit mostRecentInProgressCommit; - public Commit mostRecentInProgressCommitWithUpdate; private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>(); @@ -56,7 +55,6 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> // need to inject the right key in the empty commit so comparing with empty commits in the reply works as expected mostRecentCommit = Commit.emptyCommit(key, metadata); mostRecentInProgressCommit = Commit.emptyCommit(key, metadata); - mostRecentInProgressCommitWithUpdate = Commit.emptyCommit(key, metadata); } public synchronized void response(MessageIn<PrepareResponse> message) @@ -64,9 +62,8 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> PrepareResponse response = message.payload; logger.trace("Prepare response {} from {}", response, message.from); - // In case of clock skew, another node could be proposing with ballot that are quite a bit - // older than our own. In that case, we record the more recent commit we've received to make - // sure we re-prepare on an older ballot. + // We set the mostRecentInProgressCommit even if we're not promised as, in that case, the ballot of that commit + // will be used to avoid generating a ballot that has not chance to win on retry (think clock skew). if (response.inProgressCommit.isAfter(mostRecentInProgressCommit)) mostRecentInProgressCommit = response.inProgressCommit; @@ -82,11 +79,6 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> if (response.mostRecentCommit.isAfter(mostRecentCommit)) mostRecentCommit = response.mostRecentCommit; - // If some response has an update, then we should replay the update with the highest ballot. So find - // the the highest commit that actually have an update - if (response.inProgressCommit.isAfter(mostRecentInProgressCommitWithUpdate) && !response.inProgressCommit.update.isEmpty()) - mostRecentInProgressCommitWithUpdate = response.inProgressCommit; - latch.countDown(); } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index ff0095d..0d2eb4b 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -647,55 +647,59 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance return YamlConfigurationLoader.fromMap(params, check, Config.class); } - private void initializeRing(ICluster cluster) + public static void addToRing(boolean bootstrapping, IInstance peer) { - // This should be done outside instance in order to avoid serializing config - String partitionerName = config.getString("partitioner"); - List<String> initialTokens = new ArrayList<>(); - List<InetSocketAddress> hosts = new ArrayList<>(); - List<UUID> hostIds = new ArrayList<>(); - for (int i = 1 ; i <= cluster.size() ; ++i) + try { - IInstanceConfig config = cluster.get(i).config(); - initialTokens.add(config.getString("initial_token")); - hosts.add(config.broadcastAddress()); - hostIds.add(config.hostId()); + IInstanceConfig config = peer.config(); + IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner")); + Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token")); + InetAddress address = config.broadcastAddress().getAddress(); + + UUID hostId = config.hostId(); + Gossiper.runInGossipStageBlocking(() -> { + Gossiper.instance.initializeNodeUnsafe(address, hostId, 1); + Gossiper.instance.injectApplicationState(address, + ApplicationState.TOKENS, + new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token))); + StorageService.instance.onChange(address, + ApplicationState.STATUS, + bootstrapping + ? new VersionedValue.VersionedValueFactory(partitioner).bootstrapping(Collections.singleton(token)) + : new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token))); + Gossiper.instance.realMarkAlive(address, Gossiper.instance.getEndpointStateForEndpoint(address)); + }); + int messagingVersion = peer.isShutdown() + ? MessagingService.current_version + : Math.min(MessagingService.current_version, peer.getMessagingVersion()); + MessagingService.instance().setVersion(address, messagingVersion); + + if (!bootstrapping) + assert StorageService.instance.getTokenMetadata().isMember(address); + PendingRangeCalculatorService.instance.blockUntilFinished(); } + catch (Throwable e) // UnknownHostException + { + throw new RuntimeException(e); + } + } + public static void removeFromRing(IInstance peer) + { try { - IPartitioner partitioner = FBUtilities.newPartitioner(partitionerName); - StorageService storageService = StorageService.instance; - List<Token> tokens = new ArrayList<>(); - for (String token : initialTokens) - tokens.add(partitioner.getTokenFactory().fromString(token)); - - for (int i = 0; i < tokens.size(); i++) - { - InetSocketAddress ep = hosts.get(i); - UUID hostId = hostIds.get(i); - Token token = tokens.get(i); - Gossiper.runInGossipStageBlocking(() -> { - Gossiper.instance.initializeNodeUnsafe(ep.getAddress(), hostId, 1); - Gossiper.instance.injectApplicationState(ep.getAddress(), - ApplicationState.TOKENS, - new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token))); - storageService.onChange(ep.getAddress(), - ApplicationState.STATUS, - new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token))); - Gossiper.instance.realMarkAlive(ep.getAddress(), Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress())); - }); - int messagingVersion = cluster.get(ep).isShutdown() - ? MessagingService.current_version - : Math.min(MessagingService.current_version, cluster.get(ep).getMessagingVersion()); - MessagingService.instance().setVersion(ep.getAddress(), messagingVersion); - } - - // check that all nodes are in token metadata - for (int i = 0; i < tokens.size(); ++i) - assert storageService.getTokenMetadata().isMember(hosts.get(i).getAddress()); - - storageService.setNormalModeUnsafe(); + IInstanceConfig config = peer.config(); + IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner")); + Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token")); + InetAddress address = config.broadcastAddress().getAddress(); + + Gossiper.runInGossipStageBlocking(() -> { + StorageService.instance.onChange(address, + ApplicationState.STATUS, + new VersionedValue.VersionedValueFactory(partitioner).left(Collections.singleton(token), 0L)); + Gossiper.instance.removeEndpoint(address); + }); + PendingRangeCalculatorService.instance.blockUntilFinished(); } catch (Throwable e) // UnknownHostException { @@ -703,6 +707,28 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance } } + public static void addToRingNormal(IInstance peer) + { + addToRing(false, peer); + assert StorageService.instance.getTokenMetadata().isMember(peer.broadcastAddress().getAddress()); + } + + public static void addToRingBootstrapping(IInstance peer) + { + addToRing(true, peer); + } + + private static void initializeRing(ICluster cluster) + { + for (int i = 1 ; i <= cluster.size() ; ++i) + addToRing(false, cluster.get(i)); + + for (int i = 1; i <= cluster.size(); ++i) + assert StorageService.instance.getTokenMetadata().isMember(cluster.get(i).broadcastAddress().getAddress()); + + StorageService.instance.setNormalModeUnsafe(); + } + public Future<Void> shutdown() { return shutdown(true); diff --git a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java new file mode 100644 index 0000000..0b1dce6 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java @@ -0,0 +1,679 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.util.UUID; +import java.util.function.BiConsumer; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICoordinator; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.IMessageFilters; +import org.apache.cassandra.distributed.impl.Instance; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.UUIDGen; + +import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; +import static org.apache.cassandra.distributed.shared.AssertUtils.fail; +import static org.apache.cassandra.distributed.shared.AssertUtils.row; +import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_COMMIT; +import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_PREPARE; +import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_PROPOSE; +import static org.apache.cassandra.net.MessagingService.Verb.READ; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class CASTest extends TestBaseImpl +{ + @Test + public void simpleUpdate() throws Throwable + { + try (Cluster cluster = init(Cluster.create(3))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), + row(1, 1, 1)); + cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 3 WHERE pk = 1 and ck = 1 IF v = 2", ConsistencyLevel.QUORUM); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), + row(1, 1, 1)); + cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), + row(1, 1, 2)); + } + } + + @Test + public void incompletePrepare() throws Throwable + { + try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L).set("cas_contention_timeout_in_ms", 200L)))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + IMessageFilters.Filter drop = cluster.filters().verbs(PAXOS_PREPARE.ordinal()).from(1).to(2, 3).drop(); + try + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM); + Assert.fail(); + } + catch (RuntimeException wrapped) + { + Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage()); + } + drop.off(); + cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL)); + } + } + + @Test + public void incompletePropose() throws Throwable + { + try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L).set("cas_contention_timeout_in_ms", 200L)))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + IMessageFilters.Filter drop1 = cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(2, 3).drop(); + try + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM); + Assert.fail(); + } + catch (RuntimeException wrapped) + { + Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage()); + } + drop1.off(); + // make sure we encounter one of the in-progress proposals so we complete it + cluster.filters().verbs(PAXOS_PREPARE.ordinal()).from(1).to(2).drop(); + cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), + row(1, 1, 2)); + } + } + + @Test + public void incompleteCommit() throws Throwable + { + try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L).set("cas_contention_timeout_in_ms", 200L)))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + IMessageFilters.Filter drop1 = cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop(); + try + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM); + Assert.fail(); + } + catch (RuntimeException wrapped) + { + Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage()); + } + drop1.off(); + // make sure we see one of the successful commits + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(2).drop(); + cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), + row(1, 1, 2)); + } + } + + private int[] paxosAndReadVerbs() { + return new int[] { + MessagingService.Verb.PAXOS_PREPARE.ordinal(), + MessagingService.Verb.PAXOS_PROPOSE.ordinal(), + MessagingService.Verb.PAXOS_COMMIT.ordinal(), + MessagingService.Verb.READ.ordinal() + }; + } + + /** + * Base test to ensure that if a write times out but with a proposal accepted by some nodes (less then quorum), and + * a following SERIAL operation does not observe that write (the node having accepted it do not participate in that + * following operation), then that write is never applied, even when the nodes having accepted the original proposal + * participate. + * + * <p>In other words, if an operation timeout, it may or may not be applied, but that "fate" is persistently decided + * by the very SERIAL operation that "succeed" (in the sense of 'not timing out or throwing some other exception'). + * + * @param postTimeoutOperation1 a SERIAL operation executed after an initial write that inserts the row [0, 0] times + * out. It is executed with a QUORUM of nodes that have _not_ see the timed out + * proposal, and so that operation should expect that the [0, 0] write has not taken + * place. + * @param postTimeoutOperation2 a 2nd SERIAL operation executed _after_ {@code postTimeoutOperation1}, with no + * write executed between the 2 operation. Contrarily to the 1st operation, the QORUM + * for this operation _will_ include the node that got the proposal for the [0, 0] + * insert but didn't participated to {@code postTimeoutOperation1}}. That operation + * should also no witness that [0, 0] write (since {@code postTimeoutOperation1} + * didn't). + * @param loseCommitOfOperation1 if {@code true}, the test will also drop the "commits" messages for + * {@code postTimeoutOperation1}. In general, the test should behave the same with or + * without that flag since a value is decided as soon as it has been "accepted by + * quorum" and the commits should always be properly replayed. + */ + private void consistencyAfterWriteTimeoutTest(BiConsumer<String, ICoordinator> postTimeoutOperation1, + BiConsumer<String, ICoordinator> postTimeoutOperation2, + boolean loseCommitOfOperation1) throws IOException + { + try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L) + .set("cas_contention_timeout_in_ms", 200L)))) + { + String table = KEYSPACE + ".t"; + cluster.schemaChange("CREATE TABLE " + table + " (k int PRIMARY KEY, v int)"); + + // We do a CAS insertion, but have with the PROPOSE message dropped on node 1 and 2. The CAS will not get + // through and should timeout. Importantly, node 3 does receive and answer the PROPOSE. + IMessageFilters.Filter dropProposeFilter = cluster.filters() + .inbound() + .verbs(MessagingService.Verb.PAXOS_PROPOSE.ordinal()) + .to(1, 2) + .drop(); + try + { + // NOTE: the consistency below is the "commit" one, so it doesn't matter at all here. + cluster.coordinator(1) + .execute("INSERT INTO " + table + "(k, v) VALUES (0, 0) IF NOT EXISTS", ConsistencyLevel.ONE); + fail("The insertion should have timed-out"); + } + catch (Exception e) + { + // We expect a write timeout. If we get one, the test can continue, otherwise, we rethrow. Note that we + // look at the root cause because the dtest framework effectively wrap the exception in a RuntimeException + // (we could just look at the immediate cause, but this feel a bit more resilient this way). + // TODO: we can't use an instanceof below because the WriteTimeoutException we get is from a different class + // loader than the one the test run under, and that's our poor-man work-around. This kind of things should + // be improved at the dtest API level. + if (!e.getCause().getClass().getSimpleName().equals("WriteTimeoutException")) + throw e; + } + finally + { + dropProposeFilter.off(); + } + + // Isolates node 3 and executes the SERIAL operation. As neither node 1 or 2 got the initial insert proposal, + // there is nothing to "replay" and the operation should assert the table is still empty. + IMessageFilters.Filter ignoreNode3Filter = cluster.filters().verbs(paxosAndReadVerbs()).to(3).drop(); + IMessageFilters.Filter dropCommitFilter = null; + if (loseCommitOfOperation1) + { + dropCommitFilter = cluster.filters().verbs(PAXOS_COMMIT.ordinal()).to(1, 2).drop(); + } + try + { + postTimeoutOperation1.accept(table, cluster.coordinator(1)); + } + finally + { + ignoreNode3Filter.off(); + if (dropCommitFilter != null) + dropCommitFilter.off(); + } + + // Node 3 is now back and we isolate node 2 to ensure the next read hits node 1 and 3. + // What we want to ensure is that despite node 3 having the initial insert in its paxos state in a position of + // being replayed, that insert is _not_ replayed (it would contradict serializability since the previous + // operation asserted nothing was inserted). It is this execution that failed before CASSANDRA-12126. + IMessageFilters.Filter ignoreNode2Filter = cluster.filters().verbs(paxosAndReadVerbs()).to(2).drop(); + try + { + postTimeoutOperation2.accept(table, cluster.coordinator(1)); + } + finally + { + ignoreNode2Filter.off(); + } + } + } + + /** + * Tests that if a write timeouts and a following serial read does not see that write, then no following reads sees + * it, even if some nodes still have the write in their paxos state. + * + * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126. + */ + @Test + public void readConsistencyAfterWriteTimeoutTest() throws IOException + { + BiConsumer<String, ICoordinator> operation = + (table, coordinator) -> assertRows(coordinator.execute("SELECT * FROM " + table + " WHERE k=0", + ConsistencyLevel.SERIAL)); + + consistencyAfterWriteTimeoutTest(operation, operation, false); + consistencyAfterWriteTimeoutTest(operation, operation, true); + } + + /** + * Tests that if a write timeouts, then a following CAS succeed but does not apply in a way that indicate the write + * has not applied, then no following CAS can see that initial insert , even if some nodes still have the write in + * their paxos state. + * + * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126. + */ + @Test + public void nonApplyingCasConsistencyAfterWriteTimeout() throws IOException + { + // Note: we use CL.ANY so that the operation don't timeout in the case where we "lost" the operation1 commits. + // The commit CL shouldn't have impact on this test anyway, so this doesn't diminishes the test. + BiConsumer<String, ICoordinator> operation = + (table, coordinator) -> assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k = 0 IF v = 0", + ConsistencyLevel.ANY)); + consistencyAfterWriteTimeoutTest(operation, operation, false); + consistencyAfterWriteTimeoutTest(operation, operation, true); + } + + /** + * Tests that if a write timeouts and a following serial read does not see that write, then no following CAS see + * that initial insert, even if some nodes still have the write in their paxos state. + * + * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126. + */ + @Test + public void mixedReadAndNonApplyingCasConsistencyAfterWriteTimeout() throws IOException + { + BiConsumer<String, ICoordinator> operation1 = + (table, coordinator) -> assertRows(coordinator.execute("SELECT * FROM " + table + " WHERE k=0", + ConsistencyLevel.SERIAL)); + BiConsumer<String, ICoordinator> operation2 = + (table, coordinator) -> assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k = 0 IF v = 0", + ConsistencyLevel.QUORUM)); + consistencyAfterWriteTimeoutTest(operation1, operation2, false); + consistencyAfterWriteTimeoutTest(operation1, operation2, true); + } + + /** + * Tests that if a write timeouts and a following CAS succeed but does not apply in a way that indicate the write + * has not applied, then following serial reads do no see that write, even if some nodes still have the write in + * their paxos state. + * + * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126. + */ + @Test + public void mixedNonApplyingCasAndReadConsistencyAfterWriteTimeout() throws IOException + { + // Note: we use CL.ANY so that the operation don't timeout in the case where we "lost" the operation1 commits. + // The commit CL shouldn't have impact on this test anyway, so this doesn't diminishes the test. + BiConsumer<String, ICoordinator> operation1 = + (table, coordinator) -> assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k = 0 IF v = 0", + ConsistencyLevel.ANY)); + BiConsumer<String, ICoordinator> operation2 = + (table, coordinator) -> assertRows(coordinator.execute("SELECT * FROM " + table + " WHERE k=0", + ConsistencyLevel.SERIAL)); + consistencyAfterWriteTimeoutTest(operation1, operation2, false); + consistencyAfterWriteTimeoutTest(operation1, operation2, true); + } + + // TODO: this shoud probably be moved into the dtest API. + private void assertCasNotApplied(Object[][] resultSet) + { + assertFalse("Expected a CAS resultSet (with at least application result) but got an empty one.", + resultSet.length == 0); + assertFalse("Invalid empty first row in CAS resultSet.", resultSet[0].length == 0); + Object wasApplied = resultSet[0][0]; + assertTrue("Expected 1st column of CAS resultSet to be a boolean, but got a " + wasApplied.getClass(), + wasApplied instanceof Boolean); + assertFalse("Expected CAS to not be applied, but was applied.", (Boolean)wasApplied); + } + + /** + * Failed write (by node that did not yet witness a range movement via gossip) is witnessed later as successful + * conflicting with another successful write performed by a node that did witness the range movement + * Prepare, Propose and Commit A to {1, 2} + * Range moves to {2, 3, 4} + * Prepare and Propose B (=> !A) to {3, 4} + */ + @Ignore + @Test + public void testSuccessfulWriteBeforeRangeMovement() throws Throwable + { + try (Cluster cluster = Cluster.create(4, config -> config + .set("write_request_timeout_in_ms", 200L) + .set("cas_contention_timeout_in_ms", 200L))) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))"); + + // make it so {1} is unaware (yet) that {4} is an owner of the token + cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4)); + + int pk = pk(cluster, 1, 2); + + // {1} promises and accepts on !{3} => {1, 2}; commits on !{2,3} => {1} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop(); + assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(true)); + + for (int i = 1 ; i <= 3 ; ++i) + cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4)); + + // {4} reads from !{2} => {3, 4} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(2).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(2).drop(); + assertRows(cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(false, pk, 1, 1, null)); + } + } + + /** + * Failed write (by node that did not yet witness a range movement via gossip) is witnessed later as successful + * conflicting with another successful write performed by a node that did witness the range movement + * - Range moves from {1, 2, 3} to {2, 3, 4}, witnessed by X (not by !X) + * - X: Prepare, Propose and Commit A to {3, 4} + * - !X: Prepare and Propose B (=>!A) to {1, 2} + */ + @Ignore + @Test + public void testConflictingWritesWithStaleRingInformation() throws Throwable + { + try (Cluster cluster = Cluster.create(4, config -> config + .set("write_request_timeout_in_ms", 200L) + .set("cas_contention_timeout_in_ms", 200L))) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))"); + + // make it so {1} is unaware (yet) that {4} is an owner of the token + cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4)); + + // {4} promises, accepts and commits on !{2} => {3, 4} + int pk = pk(cluster, 1, 2); + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(2).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(2).drop(); + cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(4).to(2).drop(); + assertRows(cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(true)); + + // {1} promises, accepts and commmits on !{3} => {1, 2} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(3).drop(); + assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(false, pk, 1, 1, null)); + } + } + + /** + * Successful write during range movement, not witnessed by read after range movement. + * Very similar to {@link #testConflictingWritesWithStaleRingInformation}. + * + * - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X) + * - !X: Prepare and Propose to {1, 2} + * - Range movement witnessed by !X + * - Any: Prepare and Read from {3, 4} + */ + @Ignore + @Test + public void testSucccessfulWriteDuringRangeMovementFollowedByRead() throws Throwable + { + try (Cluster cluster = Cluster.create(4, config -> config + .set("write_request_timeout_in_ms", 200L) + .set("cas_contention_timeout_in_ms", 200L))) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + // make it so {4} is bootstrapping, and this has not propagated to other nodes yet + for (int i = 1 ; i <= 4 ; ++i) + cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4)); + cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4)); + + int pk = pk(cluster, 1, 2); + + // {1} promises and accepts on !{3} => {1, 2}; commmits on !{2, 3} => {1} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop(); + assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(true)); + + // finish topology change + for (int i = 1 ; i <= 4 ; ++i) + cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4)); + + // {3} reads from !{2} => {3, 4} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop(); + assertRows(cluster.coordinator(3).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", ConsistencyLevel.SERIAL, pk), + row(pk, 1, 1)); + } + } + + /** + * Successful write during range movement not witnessed by write after range movement + * + * - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X) + * - !X: Prepare and Propose to {1, 2} + * - Range movement witnessed by !X + * - Any: Prepare and Propose to {3, 4} + */ + @Ignore + @Test + public void testSuccessfulWriteDuringRangeMovementFollowedByConflicting() throws Throwable + { + try (Cluster cluster = Cluster.create(4, config -> config + .set("write_request_timeout_in_ms", 200L) + .set("cas_contention_timeout_in_ms", 200L))) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))"); + + // make it so {4} is bootstrapping, and this has not propagated to other nodes yet + for (int i = 1 ; i <= 4 ; ++i) + cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4)); + cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4)); + + int pk = pk(cluster, 1, 2); + + // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} => {1} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop(); + assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(true)); + + // finish topology change + for (int i = 1 ; i <= 4 ; ++i) + cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4)); + + // {3} reads from !{2} => {3, 4} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop(); + assertRows(cluster.coordinator(3).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(false, pk, 1, 1, null)); + + // TODO: repair and verify base table state + } + } + + /** + * During a range movement, a CAS may fail leaving side effects that are not witnessed by another operation + * being performed with stale ring information. + * This is a particular special case of stale ring information sequencing, which probably would be resolved + * by fixing each of the more isolated cases (but is unique, so deserving of its own test case). + * See CASSANDRA-15745 + * + * - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X) + * - X: Prepare to {2, 3, 4} + * - X: Propose to {4} + * - !X: Prepare and Propose to {1, 2} + * - Range move visible by !X + * - Any: Prepare and Read from {3, 4} + */ + @Ignore + @Test + public void testIncompleteWriteFollowedBySuccessfulWriteWithStaleRingDuringRangeMovementFollowedByRead() throws Throwable + { + try (Cluster cluster = Cluster.create(4, config -> config + .set("write_request_timeout_in_ms", 200L) + .set("cas_contention_timeout_in_ms", 200L))) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))"); + + // make it so {4} is bootstrapping, and this has not propagated to other nodes yet + for (int i = 1 ; i <= 4 ; ++i) + cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4)); + cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4)); + + int pk = pk(cluster, 1, 2); + + // {4} promises and accepts on !{1} => {2, 3, 4}; commits on !{1, 2, 3} => {4} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(1).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(1, 2, 3).drop(); + try + { + cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM, pk); + Assert.assertTrue(false); + } + catch (RuntimeException wrapped) + { + Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage()); + } + + // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} => {1} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop(); + assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(true)); + + // finish topology change + for (int i = 1 ; i <= 4 ; ++i) + cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4)); + + // {3} reads from !{2} => {3, 4} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(3).to(2).drop(); + assertRows(cluster.coordinator(3).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", ConsistencyLevel.SERIAL, pk), + row(pk, 1, null, 2)); + } + } + + /** + * During a range movement, a CAS may fail leaving side effects that are not witnessed by another operation + * being performed with stale ring information. + * This is a particular special case of stale ring information sequencing, which probably would be resolved + * by fixing each of the more isolated cases (but is unique, so deserving of its own test case). + * See CASSANDRA-15745 + * + * - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X) + * - X: Prepare to {2, 3, 4} + * - X: Propose to {4} + * - !X: Prepare and Propose to {1, 2} + * - Range move visible by !X + * - Any: Prepare and Propose to {3, 4} + */ + @Ignore + @Test + public void testIncompleteWriteFollowedBySuccessfulWriteWithStaleRingDuringRangeMovementFollowedByWrite() throws Throwable + { + try (Cluster cluster = Cluster.create(4, config -> config + .set("write_request_timeout_in_ms", 200L) + .set("cas_contention_timeout_in_ms", 200L))) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))"); + + // make it so {4} is bootstrapping, and this has not propagated to other nodes yet + for (int i = 1 ; i <= 4 ; ++i) + cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4)); + cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4)); + + int pk = pk(cluster, 1, 2); + + // {4} promises and accepts on !{1} => {2, 3, 4}; commits on !{1, 2, 3} => {4} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(1).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(1, 2, 3).drop(); + try + { + cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM, pk); + Assert.assertTrue(false); + } + catch (RuntimeException wrapped) + { + Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage()); + } + + // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} => {1} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop(); + assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(true)); + + // finish topology change + for (int i = 1 ; i <= 4 ; ++i) + cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4)); + + // {3} reads from !{2} => {3, 4} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(3).to(2).drop(); + assertRows(cluster.coordinator(3).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(false, 5, 1, null, 2)); + } + } + + private static int pk(Cluster cluster, int lb, int ub) + { + return pk(cluster.get(lb), cluster.get(ub)); + } + + private static int pk(IInstance lb, IInstance ub) + { + return pk(Murmur3Partitioner.instance.getTokenFactory().fromString(lb.config().getString("initial_token")), + Murmur3Partitioner.instance.getTokenFactory().fromString(ub.config().getString("initial_token"))); + } + + private static int pk(Token lb, Token ub) + { + int pk = 0; + Token pkt; + while (lb.compareTo(pkt = Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(pk))) >= 0 || ub.compareTo(pkt) < 0) + ++pk; + return pk; + } + + private static void debugOwnership(Cluster cluster, int pk) + { + for (int i = 1 ; i <= cluster.size() ; ++i) + System.out.println(i + ": " + cluster.get(i).appliesOnInstance((Integer v) -> StorageService.instance.getNaturalAndPendingEndpoints(KEYSPACE, Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(v)))) + .apply(pk)); + } + + private static void debugPaxosState(Cluster cluster, int pk) + { + UUID cfid = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metadata.cfId); + for (int i = 1 ; i <= cluster.size() ; ++i) + for (Object[] row : cluster.get(i).executeInternal("select in_progress_ballot, proposal_ballot, most_recent_commit_at from system.paxos where row_key = ? and cf_id = ?", Int32Type.instance.decompose(pk), cfid)) + System.out.println(i + ": " + (row[0] == null ? 0L : UUIDGen.microsTimestamp((UUID)row[0])) + ", " + (row[1] == null ? 0L : UUIDGen.microsTimestamp((UUID)row[1])) + ", " + (row[2] == null ? 0L : UUIDGen.microsTimestamp((UUID)row[2]))); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org