Updated Branches:
  refs/heads/trunk fcd0aebb6 -> 01e02192e

CAS may return false but still commit the insert

patch by slebresne; reviewed by jbellis for CASSANDRA-6013


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6ec4eef2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6ec4eef2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6ec4eef2

Branch: refs/heads/trunk
Commit: 6ec4eef2b0d7c7ef235299d47016dd64ff2ace04
Parents: 3bba32c
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Mon Sep 16 08:39:13 2013 +0200
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Mon Sep 16 08:39:13 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +-
 .../apache/cassandra/service/StorageProxy.java  | 18 ++++---
 .../service/paxos/ProposeCallback.java          | 49 +++++++++++++++++---
 3 files changed, 56 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ec4eef2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9e3c25a..5517cee 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,7 +15,7 @@
  * Require superuser status for adding triggers (CASSANDRA-5963)
  * Make standalone scrubber handle old and new style leveled manifest
    (CASSANDRA-6005)
- * Fix paxos not always replaying when it should  (CASSANDRA-6012)
+ * Fix paxos bugs (CASSANDRA-6012, 6013)
 Merged from 1.2:
 1.2.10
  * Fix possible divide-by-zero in HHOM (CASSANDRA-5990)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ec4eef2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index aa50d02..3e716f5 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -255,7 +254,7 @@ public class StorageProxy implements StorageProxyMBean
             // TODO turn null updates into delete?
             Commit proposal = Commit.newProposal(key, ballot, updates);
             Tracing.trace("CAS precondition is met; proposing client-requested 
updates for {}", ballot);
-            if (proposePaxos(proposal, liveEndpoints, requiredParticipants))
+            if (proposePaxos(proposal, liveEndpoints, requiredParticipants, 
true))
             {
                 if (consistencyForCommit == ConsistencyLevel.ANY)
                     sendCommit(proposal, liveEndpoints);
@@ -380,7 +379,7 @@ public class StorageProxy implements StorageProxyMBean
             {
                 Tracing.trace("Finishing incomplete paxos round {}", 
inProgress);
                 Commit refreshedInProgress = 
Commit.newProposal(inProgress.key, ballot, inProgress.update);
-                if (proposePaxos(refreshedInProgress, liveEndpoints, 
requiredParticipants))
+                if (proposePaxos(refreshedInProgress, liveEndpoints, 
requiredParticipants, false))
                 {
                     commitPaxos(refreshedInProgress, ConsistencyLevel.QUORUM);
                 }
@@ -436,16 +435,23 @@ public class StorageProxy implements StorageProxyMBean
         return callback;
     }
 
-    private static boolean proposePaxos(Commit proposal, List<InetAddress> 
endpoints, int requiredParticipants)
+    private static boolean proposePaxos(Commit proposal, List<InetAddress> 
endpoints, int requiredParticipants, boolean timeoutIfPartial)
     throws WriteTimeoutException
     {
-        ProposeCallback callback = new ProposeCallback(requiredParticipants);
+        ProposeCallback callback = new ProposeCallback(endpoints.size(), 
requiredParticipants, !timeoutIfPartial);
         MessageOut<Commit> message = new 
MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, 
Commit.serializer);
         for (InetAddress target : endpoints)
             MessagingService.instance().sendRR(message, target, callback);
+
         callback.await();
 
-        return callback.getSuccessful() >= requiredParticipants;
+        if (callback.isSuccessful())
+            return true;
+
+        if (timeoutIfPartial && !callback.isFullyRefused())
+            throw new WriteTimeoutException(WriteType.CAS, 
ConsistencyLevel.SERIAL, callback.getAcceptCount(), requiredParticipants);
+
+        return false;
     }
 
     private static void commitPaxos(Commit proposal, ConsistencyLevel 
consistencyLevel) throws WriteTimeoutException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ec4eef2/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java 
b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
index 67aed45..0075840 100644
--- a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
@@ -28,15 +28,32 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.net.MessageIn;
 
+/**
+ * ProposeCallback has two modes of operation, controlled by the failFast 
parameter.
+ *
+ * In failFast mode, we will return a failure as soon as a majority of nodes 
reject
+ * the proposal. This is used when replaying a proposal from an earlier leader.
+ *
+ * Otherwise, we wait for either all replicas to reply or until we achieve
+ * the desired quorum. We continue to wait for all replicas even after we know 
we cannot succeed
+ * because we need to know if no node at all have accepted or if at least one 
has.
+ * In the former case, a proposer is guaranteed no-one will
+ * replay its value; in the latter we don't, so we must timeout in case another
+ * leader replays it before we can; see CASSANDRA-6013
+ */
 public class ProposeCallback extends AbstractPaxosCallback<Boolean>
 {
     private static final Logger logger = 
LoggerFactory.getLogger(ProposeCallback.class);
 
-    private final AtomicInteger successful = new AtomicInteger(0);
+    private final AtomicInteger accepts = new AtomicInteger(0);
+    private final int requiredAccepts;
+    private final boolean failFast;
 
-    public ProposeCallback(int targets)
+    public ProposeCallback(int totalTargets, int requiredTargets, boolean 
failFast)
     {
-        super(targets);
+        super(totalTargets);
+        this.requiredAccepts = requiredTargets;
+        this.failFast = failFast;
     }
 
     public void response(MessageIn<Boolean> msg)
@@ -44,12 +61,32 @@ public class ProposeCallback extends 
AbstractPaxosCallback<Boolean>
         logger.debug("Propose response {} from {}", msg.payload, msg.from);
 
         if (msg.payload)
-            successful.incrementAndGet();
+            accepts.incrementAndGet();
+
         latch.countDown();
+
+        if (isSuccessful() || (failFast && (latch.getCount() + accepts.get() < 
requiredAccepts)))
+        {
+            while (latch.getCount() > 0)
+                latch.countDown();
+        }
+    }
+
+    public int getAcceptCount()
+    {
+        return accepts.get();
+    }
+
+    public boolean isSuccessful()
+    {
+        return accepts.get() >= requiredAccepts;
     }
 
-    public int getSuccessful()
+    // Note: this is only reliable if !failFast
+    public boolean isFullyRefused()
     {
-        return successful.get();
+        // We need to check the latch first to avoid racing with a late arrival
+        // between the latch check and the accepts one
+        return latch.getCount() == 0 && accepts.get() == 0;
     }
 }

Reply via email to