Updated Branches: refs/heads/trunk fcd0aebb6 -> 01e02192e
CAS may return false but still commit the insert patch by slebresne; reviewed by jbellis for CASSANDRA-6013 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6ec4eef2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6ec4eef2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6ec4eef2 Branch: refs/heads/trunk Commit: 6ec4eef2b0d7c7ef235299d47016dd64ff2ace04 Parents: 3bba32c Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Mon Sep 16 08:39:13 2013 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Mon Sep 16 08:39:13 2013 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../apache/cassandra/service/StorageProxy.java | 18 ++++--- .../service/paxos/ProposeCallback.java | 49 +++++++++++++++++--- 3 files changed, 56 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ec4eef2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9e3c25a..5517cee 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -15,7 +15,7 @@ * Require superuser status for adding triggers (CASSANDRA-5963) * Make standalone scrubber handle old and new style leveled manifest (CASSANDRA-6005) - * Fix paxos not always replaying when it should (CASSANDRA-6012) + * Fix paxos bugs (CASSANDRA-6012, 6013) Merged from 1.2: 1.2.10 * Fix possible divide-by-zero in HHOM (CASSANDRA-5990) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ec4eef2/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index aa50d02..3e716f5 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -24,7 +24,6 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -255,7 +254,7 @@ public class StorageProxy implements StorageProxyMBean // TODO turn null updates into delete? Commit proposal = Commit.newProposal(key, ballot, updates); Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); - if (proposePaxos(proposal, liveEndpoints, requiredParticipants)) + if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true)) { if (consistencyForCommit == ConsistencyLevel.ANY) sendCommit(proposal, liveEndpoints); @@ -380,7 +379,7 @@ public class StorageProxy implements StorageProxyMBean { Tracing.trace("Finishing incomplete paxos round {}", inProgress); Commit refreshedInProgress = Commit.newProposal(inProgress.key, ballot, inProgress.update); - if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants)) + if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false)) { commitPaxos(refreshedInProgress, ConsistencyLevel.QUORUM); } @@ -436,16 +435,23 @@ public class StorageProxy implements StorageProxyMBean return callback; } - private static boolean proposePaxos(Commit proposal, List<InetAddress> endpoints, int requiredParticipants) + private static boolean proposePaxos(Commit proposal, List<InetAddress> endpoints, int requiredParticipants, boolean timeoutIfPartial) throws WriteTimeoutException { - ProposeCallback callback = new ProposeCallback(requiredParticipants); + ProposeCallback callback = new ProposeCallback(endpoints.size(), requiredParticipants, !timeoutIfPartial); MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, Commit.serializer); for (InetAddress target : endpoints) MessagingService.instance().sendRR(message, target, callback); + callback.await(); - return callback.getSuccessful() >= requiredParticipants; + if (callback.isSuccessful()) + return true; + + if (timeoutIfPartial && !callback.isFullyRefused()) + throw new WriteTimeoutException(WriteType.CAS, ConsistencyLevel.SERIAL, callback.getAcceptCount(), requiredParticipants); + + return false; } private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel) throws WriteTimeoutException http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ec4eef2/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java index 67aed45..0075840 100644 --- a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java +++ b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java @@ -28,15 +28,32 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.net.MessageIn; +/** + * ProposeCallback has two modes of operation, controlled by the failFast parameter. + * + * In failFast mode, we will return a failure as soon as a majority of nodes reject + * the proposal. This is used when replaying a proposal from an earlier leader. + * + * Otherwise, we wait for either all replicas to reply or until we achieve + * the desired quorum. We continue to wait for all replicas even after we know we cannot succeed + * because we need to know if no node at all have accepted or if at least one has. + * In the former case, a proposer is guaranteed no-one will + * replay its value; in the latter we don't, so we must timeout in case another + * leader replays it before we can; see CASSANDRA-6013 + */ public class ProposeCallback extends AbstractPaxosCallback<Boolean> { private static final Logger logger = LoggerFactory.getLogger(ProposeCallback.class); - private final AtomicInteger successful = new AtomicInteger(0); + private final AtomicInteger accepts = new AtomicInteger(0); + private final int requiredAccepts; + private final boolean failFast; - public ProposeCallback(int targets) + public ProposeCallback(int totalTargets, int requiredTargets, boolean failFast) { - super(targets); + super(totalTargets); + this.requiredAccepts = requiredTargets; + this.failFast = failFast; } public void response(MessageIn<Boolean> msg) @@ -44,12 +61,32 @@ public class ProposeCallback extends AbstractPaxosCallback<Boolean> logger.debug("Propose response {} from {}", msg.payload, msg.from); if (msg.payload) - successful.incrementAndGet(); + accepts.incrementAndGet(); + latch.countDown(); + + if (isSuccessful() || (failFast && (latch.getCount() + accepts.get() < requiredAccepts))) + { + while (latch.getCount() > 0) + latch.countDown(); + } + } + + public int getAcceptCount() + { + return accepts.get(); + } + + public boolean isSuccessful() + { + return accepts.get() >= requiredAccepts; } - public int getSuccessful() + // Note: this is only reliable if !failFast + public boolean isFullyRefused() { - return successful.get(); + // We need to check the latch first to avoid racing with a late arrival + // between the latch check and the accepts one + return latch.getCount() == 0 && accepts.get() == 0; } }