Updated Branches:
  refs/heads/trunk 0387cf587 -> bc3597d35

Add support for specifying CAS commit ConsistencyLevel
patch by jbellis; reviewed by aleksey for CASSANDRA-5442


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bc3597d3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bc3597d3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bc3597d3

Branch: refs/heads/trunk
Commit: bc3597d3549850997fd137cc8b74700c62cebf64
Parents: 0387cf5
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Mon May 27 14:31:44 2013 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Mon May 27 14:31:44 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +-
 interface/cassandra.thrift                         |   14 +-
 .../org/apache/cassandra/thrift/Cassandra.java     |  147 +++++++++++++-
 .../apache/cassandra/thrift/TimedOutException.java |  136 +++++++++++++-
 .../cql3/statements/ModificationStatement.java     |    2 +-
 .../org/apache/cassandra/db/ConsistencyLevel.java  |   13 ++
 .../org/apache/cassandra/db/WriteResponse.java     |    4 +-
 .../org/apache/cassandra/service/StorageProxy.java |   54 +++++-
 .../cassandra/service/paxos/CommitVerbHandler.java |    7 +
 .../apache/cassandra/thrift/CassandraServer.java   |    4 +-
 .../apache/cassandra/thrift/ThriftConversion.java  |    2 +
 test/system/test_thrift_server.py                  |    9 +-
 12 files changed, 354 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0ce9e63..e233ba0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,7 +9,7 @@
  * Removed compatibility with pre-1.2.5 sstables and network messages
    (CASSANDRA-5511)
  * removed PBSPredictor (CASSANDRA-5455)
- * CAS support (CASSANDRA-5062, 5441, 5443)
+ * CAS support (CASSANDRA-5062, 5441, 5442, 5443)
  * Leveled compaction performs size-tiered compactions in L0 
    (CASSANDRA-5371, 5439)
  * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/interface/cassandra.thrift
----------------------------------------------------------------------
diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift
index 1e78d51..cfdaf88 100644
--- a/interface/cassandra.thrift
+++ b/interface/cassandra.thrift
@@ -148,10 +148,17 @@ exception TimedOutException {
      */
     1: optional i32 acknowledged_by
 
-    /**
-     * in case of atomic_batch_mutate method this field tells if the batch was 
written to the batchlog.
+    /** 
+     * in case of atomic_batch_mutate method this field tells if the batch 
+     * was written to the batchlog.  
      */
     2: optional bool acknowledged_by_batchlog
+
+    /** 
+     * for the CAS method, this field tells if we timed out during the paxos
+     * protocol, as opposed to during the commit of our update
+     */
+    3: optional bool paxos_in_progress
 }
 
 /** invalid authentication request (invalid keyspace, user does not exist, or 
credentials invalid) */
@@ -643,7 +650,8 @@ service Cassandra {
   bool cas(1:required binary key, 
            2:required string column_family,
            3:list<Column> expected,
-           4:list<Column> updates)
+           4:list<Column> updates,
+           5:required ConsistencyLevel 
consistency_level=ConsistencyLevel.QUORUM)
        throws (1:InvalidRequestException ire, 2:UnavailableException ue, 
3:TimedOutException te),
 
   /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
----------------------------------------------------------------------
diff --git 
a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java 
b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
index 34aec98..a2761ca 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
@@ -170,8 +170,9 @@ public class Cassandra {
      * @param column_family
      * @param expected
      * @param updates
+     * @param consistency_level
      */
-    public boolean cas(ByteBuffer key, String column_family, List<Column> 
expected, List<Column> updates) throws InvalidRequestException, 
UnavailableException, TimedOutException, org.apache.thrift.TException;
+    public boolean cas(ByteBuffer key, String column_family, List<Column> 
expected, List<Column> updates, ConsistencyLevel consistency_level) throws 
InvalidRequestException, UnavailableException, TimedOutException, 
org.apache.thrift.TException;
 
     /**
      * Remove data from the row specified by key at the granularity specified 
by column_path, and the given timestamp. Note
@@ -424,7 +425,7 @@ public class Cassandra {
 
     public void add(ByteBuffer key, ColumnParent column_parent, CounterColumn 
column, ConsistencyLevel consistency_level, 
org.apache.thrift.async.AsyncMethodCallback<AsyncClient.add_call> 
resultHandler) throws org.apache.thrift.TException;
 
-    public void cas(ByteBuffer key, String column_family, List<Column> 
expected, List<Column> updates, 
org.apache.thrift.async.AsyncMethodCallback<AsyncClient.cas_call> 
resultHandler) throws org.apache.thrift.TException;
+    public void cas(ByteBuffer key, String column_family, List<Column> 
expected, List<Column> updates, ConsistencyLevel consistency_level, 
org.apache.thrift.async.AsyncMethodCallback<AsyncClient.cas_call> 
resultHandler) throws org.apache.thrift.TException;
 
     public void remove(ByteBuffer key, ColumnPath column_path, long timestamp, 
ConsistencyLevel consistency_level, 
org.apache.thrift.async.AsyncMethodCallback<AsyncClient.remove_call> 
resultHandler) throws org.apache.thrift.TException;
 
@@ -903,19 +904,20 @@ public class Cassandra {
       return;
     }
 
-    public boolean cas(ByteBuffer key, String column_family, List<Column> 
expected, List<Column> updates) throws InvalidRequestException, 
UnavailableException, TimedOutException, org.apache.thrift.TException
+    public boolean cas(ByteBuffer key, String column_family, List<Column> 
expected, List<Column> updates, ConsistencyLevel consistency_level) throws 
InvalidRequestException, UnavailableException, TimedOutException, 
org.apache.thrift.TException
     {
-      send_cas(key, column_family, expected, updates);
+      send_cas(key, column_family, expected, updates, consistency_level);
       return recv_cas();
     }
 
-    public void send_cas(ByteBuffer key, String column_family, List<Column> 
expected, List<Column> updates) throws org.apache.thrift.TException
+    public void send_cas(ByteBuffer key, String column_family, List<Column> 
expected, List<Column> updates, ConsistencyLevel consistency_level) throws 
org.apache.thrift.TException
     {
       cas_args args = new cas_args();
       args.setKey(key);
       args.setColumn_family(column_family);
       args.setExpected(expected);
       args.setUpdates(updates);
+      args.setConsistency_level(consistency_level);
       sendBase("cas", args);
     }
 
@@ -2274,9 +2276,9 @@ public class Cassandra {
       }
     }
 
-    public void cas(ByteBuffer key, String column_family, List<Column> 
expected, List<Column> updates, 
org.apache.thrift.async.AsyncMethodCallback<cas_call> resultHandler) throws 
org.apache.thrift.TException {
+    public void cas(ByteBuffer key, String column_family, List<Column> 
expected, List<Column> updates, ConsistencyLevel consistency_level, 
org.apache.thrift.async.AsyncMethodCallback<cas_call> resultHandler) throws 
org.apache.thrift.TException {
       checkReady();
-      cas_call method_call = new cas_call(key, column_family, expected, 
updates, resultHandler, this, ___protocolFactory, ___transport);
+      cas_call method_call = new cas_call(key, column_family, expected, 
updates, consistency_level, resultHandler, this, ___protocolFactory, 
___transport);
       this.___currentMethod = method_call;
       ___manager.call(method_call);
     }
@@ -2286,12 +2288,14 @@ public class Cassandra {
       private String column_family;
       private List<Column> expected;
       private List<Column> updates;
-      public cas_call(ByteBuffer key, String column_family, List<Column> 
expected, List<Column> updates, 
org.apache.thrift.async.AsyncMethodCallback<cas_call> resultHandler, 
org.apache.thrift.async.TAsyncClient client, 
org.apache.thrift.protocol.TProtocolFactory protocolFactory, 
org.apache.thrift.transport.TNonblockingTransport transport) throws 
org.apache.thrift.TException {
+      private ConsistencyLevel consistency_level;
+      public cas_call(ByteBuffer key, String column_family, List<Column> 
expected, List<Column> updates, ConsistencyLevel consistency_level, 
org.apache.thrift.async.AsyncMethodCallback<cas_call> resultHandler, 
org.apache.thrift.async.TAsyncClient client, 
org.apache.thrift.protocol.TProtocolFactory protocolFactory, 
org.apache.thrift.transport.TNonblockingTransport transport) throws 
org.apache.thrift.TException {
         super(client, protocolFactory, transport, resultHandler, false);
         this.key = key;
         this.column_family = column_family;
         this.expected = expected;
         this.updates = updates;
+        this.consistency_level = consistency_level;
       }
 
       public void write_args(org.apache.thrift.protocol.TProtocol prot) throws 
org.apache.thrift.TException {
@@ -2301,6 +2305,7 @@ public class Cassandra {
         args.setColumn_family(column_family);
         args.setExpected(expected);
         args.setUpdates(updates);
+        args.setConsistency_level(consistency_level);
         args.write(prot);
         prot.writeMessageEnd();
       }
@@ -3722,7 +3727,7 @@ public class Cassandra {
       public cas_result getResult(I iface, cas_args args) throws 
org.apache.thrift.TException {
         cas_result result = new cas_result();
         try {
-          result.success = iface.cas(args.key, args.column_family, 
args.expected, args.updates);
+          result.success = iface.cas(args.key, args.column_family, 
args.expected, args.updates, args.consistency_level);
           result.setSuccessIsSet(true);
         } catch (InvalidRequestException ire) {
           result.ire = ire;
@@ -20085,6 +20090,7 @@ public class Cassandra {
     private static final org.apache.thrift.protocol.TField 
COLUMN_FAMILY_FIELD_DESC = new 
org.apache.thrift.protocol.TField("column_family", 
org.apache.thrift.protocol.TType.STRING, (short)2);
     private static final org.apache.thrift.protocol.TField EXPECTED_FIELD_DESC 
= new org.apache.thrift.protocol.TField("expected", 
org.apache.thrift.protocol.TType.LIST, (short)3);
     private static final org.apache.thrift.protocol.TField UPDATES_FIELD_DESC 
= new org.apache.thrift.protocol.TField("updates", 
org.apache.thrift.protocol.TType.LIST, (short)4);
+    private static final org.apache.thrift.protocol.TField 
CONSISTENCY_LEVEL_FIELD_DESC = new 
org.apache.thrift.protocol.TField("consistency_level", 
org.apache.thrift.protocol.TType.I32, (short)5);
 
     private static final Map<Class<? extends IScheme>, SchemeFactory> schemes 
= new HashMap<Class<? extends IScheme>, SchemeFactory>();
     static {
@@ -20096,13 +20102,23 @@ public class Cassandra {
     public String column_family; // required
     public List<Column> expected; // required
     public List<Column> updates; // required
+    /**
+     * 
+     * @see ConsistencyLevel
+     */
+    public ConsistencyLevel consistency_level; // required
 
     /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
     public enum _Fields implements org.apache.thrift.TFieldIdEnum {
       KEY((short)1, "key"),
       COLUMN_FAMILY((short)2, "column_family"),
       EXPECTED((short)3, "expected"),
-      UPDATES((short)4, "updates");
+      UPDATES((short)4, "updates"),
+      /**
+       * 
+       * @see ConsistencyLevel
+       */
+      CONSISTENCY_LEVEL((short)5, "consistency_level");
 
       private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
 
@@ -20125,6 +20141,8 @@ public class Cassandra {
             return EXPECTED;
           case 4: // UPDATES
             return UPDATES;
+          case 5: // CONSISTENCY_LEVEL
+            return CONSISTENCY_LEVEL;
           default:
             return null;
         }
@@ -20178,24 +20196,30 @@ public class Cassandra {
       tmpMap.put(_Fields.UPDATES, new 
org.apache.thrift.meta_data.FieldMetaData("updates", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
           new 
org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, 
               new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 Column.class))));
+      tmpMap.put(_Fields.CONSISTENCY_LEVEL, new 
org.apache.thrift.meta_data.FieldMetaData("consistency_level", 
org.apache.thrift.TFieldRequirementType.REQUIRED, 
+          new 
org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, 
ConsistencyLevel.class)));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
       
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(cas_args.class, 
metaDataMap);
     }
 
     public cas_args() {
+      this.consistency_level = 
org.apache.cassandra.thrift.ConsistencyLevel.QUORUM;
+
     }
 
     public cas_args(
       ByteBuffer key,
       String column_family,
       List<Column> expected,
-      List<Column> updates)
+      List<Column> updates,
+      ConsistencyLevel consistency_level)
     {
       this();
       this.key = key;
       this.column_family = column_family;
       this.expected = expected;
       this.updates = updates;
+      this.consistency_level = consistency_level;
     }
 
     /**
@@ -20223,6 +20247,9 @@ public class Cassandra {
         }
         this.updates = __this__updates;
       }
+      if (other.isSetConsistency_level()) {
+        this.consistency_level = other.consistency_level;
+      }
     }
 
     public cas_args deepCopy() {
@@ -20235,6 +20262,8 @@ public class Cassandra {
       this.column_family = null;
       this.expected = null;
       this.updates = null;
+      this.consistency_level = 
org.apache.cassandra.thrift.ConsistencyLevel.QUORUM;
+
     }
 
     public byte[] getKey() {
@@ -20373,6 +20402,38 @@ public class Cassandra {
       }
     }
 
+    /**
+     * 
+     * @see ConsistencyLevel
+     */
+    public ConsistencyLevel getConsistency_level() {
+      return this.consistency_level;
+    }
+
+    /**
+     * 
+     * @see ConsistencyLevel
+     */
+    public cas_args setConsistency_level(ConsistencyLevel consistency_level) {
+      this.consistency_level = consistency_level;
+      return this;
+    }
+
+    public void unsetConsistency_level() {
+      this.consistency_level = null;
+    }
+
+    /** Returns true if field consistency_level is set (has been assigned a 
value) and false otherwise */
+    public boolean isSetConsistency_level() {
+      return this.consistency_level != null;
+    }
+
+    public void setConsistency_levelIsSet(boolean value) {
+      if (!value) {
+        this.consistency_level = null;
+      }
+    }
+
     public void setFieldValue(_Fields field, Object value) {
       switch (field) {
       case KEY:
@@ -20407,6 +20468,14 @@ public class Cassandra {
         }
         break;
 
+      case CONSISTENCY_LEVEL:
+        if (value == null) {
+          unsetConsistency_level();
+        } else {
+          setConsistency_level((ConsistencyLevel)value);
+        }
+        break;
+
       }
     }
 
@@ -20424,6 +20493,9 @@ public class Cassandra {
       case UPDATES:
         return getUpdates();
 
+      case CONSISTENCY_LEVEL:
+        return getConsistency_level();
+
       }
       throw new IllegalStateException();
     }
@@ -20443,6 +20515,8 @@ public class Cassandra {
         return isSetExpected();
       case UPDATES:
         return isSetUpdates();
+      case CONSISTENCY_LEVEL:
+        return isSetConsistency_level();
       }
       throw new IllegalStateException();
     }
@@ -20496,6 +20570,15 @@ public class Cassandra {
           return false;
       }
 
+      boolean this_present_consistency_level = true && 
this.isSetConsistency_level();
+      boolean that_present_consistency_level = true && 
that.isSetConsistency_level();
+      if (this_present_consistency_level || that_present_consistency_level) {
+        if (!(this_present_consistency_level && 
that_present_consistency_level))
+          return false;
+        if (!this.consistency_level.equals(that.consistency_level))
+          return false;
+      }
+
       return true;
     }
 
@@ -20523,6 +20606,11 @@ public class Cassandra {
       if (present_updates)
         builder.append(updates);
 
+      boolean present_consistency_level = true && (isSetConsistency_level());
+      builder.append(present_consistency_level);
+      if (present_consistency_level)
+        builder.append(consistency_level.getValue());
+
       return builder.toHashCode();
     }
 
@@ -20574,6 +20662,16 @@ public class Cassandra {
           return lastComparison;
         }
       }
+      lastComparison = 
Boolean.valueOf(isSetConsistency_level()).compareTo(typedOther.isSetConsistency_level());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetConsistency_level()) {
+        lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(this.consistency_level, 
typedOther.consistency_level);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
       return 0;
     }
 
@@ -20625,6 +20723,14 @@ public class Cassandra {
         sb.append(this.updates);
       }
       first = false;
+      if (!first) sb.append(", ");
+      sb.append("consistency_level:");
+      if (this.consistency_level == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.consistency_level);
+      }
+      first = false;
       sb.append(")");
       return sb.toString();
     }
@@ -20637,6 +20743,9 @@ public class Cassandra {
       if (column_family == null) {
         throw new org.apache.thrift.protocol.TProtocolException("Required 
field 'column_family' was not present! Struct: " + toString());
       }
+      if (consistency_level == null) {
+        throw new org.apache.thrift.protocol.TProtocolException("Required 
field 'consistency_level' was not present! Struct: " + toString());
+      }
       // check for sub-struct validity
     }
 
@@ -20728,6 +20837,14 @@ public class Cassandra {
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
               }
               break;
+            case 5: // CONSISTENCY_LEVEL
+              if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+                struct.consistency_level = 
ConsistencyLevel.findByValue(iprot.readI32());
+                struct.setConsistency_levelIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+              }
+              break;
             default:
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
           }
@@ -20777,6 +20894,11 @@ public class Cassandra {
           }
           oprot.writeFieldEnd();
         }
+        if (struct.consistency_level != null) {
+          oprot.writeFieldBegin(CONSISTENCY_LEVEL_FIELD_DESC);
+          oprot.writeI32(struct.consistency_level.getValue());
+          oprot.writeFieldEnd();
+        }
         oprot.writeFieldStop();
         oprot.writeStructEnd();
       }
@@ -20796,6 +20918,7 @@ public class Cassandra {
         TTupleProtocol oprot = (TTupleProtocol) prot;
         oprot.writeBinary(struct.key);
         oprot.writeString(struct.column_family);
+        oprot.writeI32(struct.consistency_level.getValue());
         BitSet optionals = new BitSet();
         if (struct.isSetExpected()) {
           optionals.set(0);
@@ -20831,6 +20954,8 @@ public class Cassandra {
         struct.setKeyIsSet(true);
         struct.column_family = iprot.readString();
         struct.setColumn_familyIsSet(true);
+        struct.consistency_level = 
ConsistencyLevel.findByValue(iprot.readI32());
+        struct.setConsistency_levelIsSet(true);
         BitSet incoming = iprot.readBitSet(2);
         if (incoming.get(0)) {
           {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
----------------------------------------------------------------------
diff --git 
a/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java 
b/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
index 3112c98..24662a7 100644
--- 
a/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
+++ 
b/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
@@ -60,6 +60,7 @@ public class TimedOutException extends TException implements 
org.apache.thrift.T
 
   private static final org.apache.thrift.protocol.TField 
ACKNOWLEDGED_BY_FIELD_DESC = new 
org.apache.thrift.protocol.TField("acknowledged_by", 
org.apache.thrift.protocol.TType.I32, (short)1);
   private static final org.apache.thrift.protocol.TField 
ACKNOWLEDGED_BY_BATCHLOG_FIELD_DESC = new 
org.apache.thrift.protocol.TField("acknowledged_by_batchlog", 
org.apache.thrift.protocol.TType.BOOL, (short)2);
+  private static final org.apache.thrift.protocol.TField 
PAXOS_IN_PROGRESS_FIELD_DESC = new 
org.apache.thrift.protocol.TField("paxos_in_progress", 
org.apache.thrift.protocol.TType.BOOL, (short)3);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = 
new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -75,9 +76,15 @@ public class TimedOutException extends TException implements 
org.apache.thrift.T
    */
   public int acknowledged_by; // optional
   /**
-   * in case of atomic_batch_mutate method this field tells if the batch was 
written to the batchlog.
+   * in case of atomic_batch_mutate method this field tells if the batch
+   * was written to the batchlog.
    */
   public boolean acknowledged_by_batchlog; // optional
+  /**
+   * for the CAS method, this field tells if we timed out during the paxos
+   * protocol, as opposed to during the commit of our update
+   */
+  public boolean paxos_in_progress; // optional
 
   /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -89,9 +96,15 @@ public class TimedOutException extends TException implements 
org.apache.thrift.T
      */
     ACKNOWLEDGED_BY((short)1, "acknowledged_by"),
     /**
-     * in case of atomic_batch_mutate method this field tells if the batch was 
written to the batchlog.
+     * in case of atomic_batch_mutate method this field tells if the batch
+     * was written to the batchlog.
+     */
+    ACKNOWLEDGED_BY_BATCHLOG((short)2, "acknowledged_by_batchlog"),
+    /**
+     * for the CAS method, this field tells if we timed out during the paxos
+     * protocol, as opposed to during the commit of our update
      */
-    ACKNOWLEDGED_BY_BATCHLOG((short)2, "acknowledged_by_batchlog");
+    PAXOS_IN_PROGRESS((short)3, "paxos_in_progress");
 
     private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
 
@@ -110,6 +123,8 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
           return ACKNOWLEDGED_BY;
         case 2: // ACKNOWLEDGED_BY_BATCHLOG
           return ACKNOWLEDGED_BY_BATCHLOG;
+        case 3: // PAXOS_IN_PROGRESS
+          return PAXOS_IN_PROGRESS;
         default:
           return null;
       }
@@ -152,8 +167,9 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
   // isset id assignments
   private static final int __ACKNOWLEDGED_BY_ISSET_ID = 0;
   private static final int __ACKNOWLEDGED_BY_BATCHLOG_ISSET_ID = 1;
+  private static final int __PAXOS_IN_PROGRESS_ISSET_ID = 2;
   private byte __isset_bitfield = 0;
-  private _Fields optionals[] = 
{_Fields.ACKNOWLEDGED_BY,_Fields.ACKNOWLEDGED_BY_BATCHLOG};
+  private _Fields optionals[] = 
{_Fields.ACKNOWLEDGED_BY,_Fields.ACKNOWLEDGED_BY_BATCHLOG,_Fields.PAXOS_IN_PROGRESS};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> 
metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new 
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -161,6 +177,8 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
         new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
     tmpMap.put(_Fields.ACKNOWLEDGED_BY_BATCHLOG, new 
org.apache.thrift.meta_data.FieldMetaData("acknowledged_by_batchlog", 
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
+    tmpMap.put(_Fields.PAXOS_IN_PROGRESS, new 
org.apache.thrift.meta_data.FieldMetaData("paxos_in_progress", 
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TimedOutException.class,
 metaDataMap);
   }
@@ -175,6 +193,7 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
     __isset_bitfield = other.__isset_bitfield;
     this.acknowledged_by = other.acknowledged_by;
     this.acknowledged_by_batchlog = other.acknowledged_by_batchlog;
+    this.paxos_in_progress = other.paxos_in_progress;
   }
 
   public TimedOutException deepCopy() {
@@ -187,6 +206,8 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
     this.acknowledged_by = 0;
     setAcknowledged_by_batchlogIsSet(false);
     this.acknowledged_by_batchlog = false;
+    setPaxos_in_progressIsSet(false);
+    this.paxos_in_progress = false;
   }
 
   /**
@@ -225,14 +246,16 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
   }
 
   /**
-   * in case of atomic_batch_mutate method this field tells if the batch was 
written to the batchlog.
+   * in case of atomic_batch_mutate method this field tells if the batch
+   * was written to the batchlog.
    */
   public boolean isAcknowledged_by_batchlog() {
     return this.acknowledged_by_batchlog;
   }
 
   /**
-   * in case of atomic_batch_mutate method this field tells if the batch was 
written to the batchlog.
+   * in case of atomic_batch_mutate method this field tells if the batch
+   * was written to the batchlog.
    */
   public TimedOutException setAcknowledged_by_batchlog(boolean 
acknowledged_by_batchlog) {
     this.acknowledged_by_batchlog = acknowledged_by_batchlog;
@@ -253,6 +276,37 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
     __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, 
__ACKNOWLEDGED_BY_BATCHLOG_ISSET_ID, value);
   }
 
+  /**
+   * for the CAS method, this field tells if we timed out during the paxos
+   * protocol, as opposed to during the commit of our update
+   */
+  public boolean isPaxos_in_progress() {
+    return this.paxos_in_progress;
+  }
+
+  /**
+   * for the CAS method, this field tells if we timed out during the paxos
+   * protocol, as opposed to during the commit of our update
+   */
+  public TimedOutException setPaxos_in_progress(boolean paxos_in_progress) {
+    this.paxos_in_progress = paxos_in_progress;
+    setPaxos_in_progressIsSet(true);
+    return this;
+  }
+
+  public void unsetPaxos_in_progress() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, 
__PAXOS_IN_PROGRESS_ISSET_ID);
+  }
+
+  /** Returns true if field paxos_in_progress is set (has been assigned a 
value) and false otherwise */
+  public boolean isSetPaxos_in_progress() {
+    return EncodingUtils.testBit(__isset_bitfield, 
__PAXOS_IN_PROGRESS_ISSET_ID);
+  }
+
+  public void setPaxos_in_progressIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, 
__PAXOS_IN_PROGRESS_ISSET_ID, value);
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case ACKNOWLEDGED_BY:
@@ -271,6 +325,14 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
       }
       break;
 
+    case PAXOS_IN_PROGRESS:
+      if (value == null) {
+        unsetPaxos_in_progress();
+      } else {
+        setPaxos_in_progress((Boolean)value);
+      }
+      break;
+
     }
   }
 
@@ -282,6 +344,9 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
     case ACKNOWLEDGED_BY_BATCHLOG:
       return Boolean.valueOf(isAcknowledged_by_batchlog());
 
+    case PAXOS_IN_PROGRESS:
+      return Boolean.valueOf(isPaxos_in_progress());
+
     }
     throw new IllegalStateException();
   }
@@ -297,6 +362,8 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
       return isSetAcknowledged_by();
     case ACKNOWLEDGED_BY_BATCHLOG:
       return isSetAcknowledged_by_batchlog();
+    case PAXOS_IN_PROGRESS:
+      return isSetPaxos_in_progress();
     }
     throw new IllegalStateException();
   }
@@ -332,6 +399,15 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
         return false;
     }
 
+    boolean this_present_paxos_in_progress = true && 
this.isSetPaxos_in_progress();
+    boolean that_present_paxos_in_progress = true && 
that.isSetPaxos_in_progress();
+    if (this_present_paxos_in_progress || that_present_paxos_in_progress) {
+      if (!(this_present_paxos_in_progress && that_present_paxos_in_progress))
+        return false;
+      if (this.paxos_in_progress != that.paxos_in_progress)
+        return false;
+    }
+
     return true;
   }
 
@@ -349,6 +425,11 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
     if (present_acknowledged_by_batchlog)
       builder.append(acknowledged_by_batchlog);
 
+    boolean present_paxos_in_progress = true && (isSetPaxos_in_progress());
+    builder.append(present_paxos_in_progress);
+    if (present_paxos_in_progress)
+      builder.append(paxos_in_progress);
+
     return builder.toHashCode();
   }
 
@@ -380,6 +461,16 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
         return lastComparison;
       }
     }
+    lastComparison = 
Boolean.valueOf(isSetPaxos_in_progress()).compareTo(typedOther.isSetPaxos_in_progress());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetPaxos_in_progress()) {
+      lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(this.paxos_in_progress, 
typedOther.paxos_in_progress);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -411,6 +502,12 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
       sb.append(this.acknowledged_by_batchlog);
       first = false;
     }
+    if (isSetPaxos_in_progress()) {
+      if (!first) sb.append(", ");
+      sb.append("paxos_in_progress:");
+      sb.append(this.paxos_in_progress);
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -472,6 +569,14 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
             }
             break;
+          case 3: // PAXOS_IN_PROGRESS
+            if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+              struct.paxos_in_progress = iprot.readBool();
+              struct.setPaxos_in_progressIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
         }
@@ -497,6 +602,11 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
         oprot.writeBool(struct.acknowledged_by_batchlog);
         oprot.writeFieldEnd();
       }
+      if (struct.isSetPaxos_in_progress()) {
+        oprot.writeFieldBegin(PAXOS_IN_PROGRESS_FIELD_DESC);
+        oprot.writeBool(struct.paxos_in_progress);
+        oprot.writeFieldEnd();
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -521,19 +631,25 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
       if (struct.isSetAcknowledged_by_batchlog()) {
         optionals.set(1);
       }
-      oprot.writeBitSet(optionals, 2);
+      if (struct.isSetPaxos_in_progress()) {
+        optionals.set(2);
+      }
+      oprot.writeBitSet(optionals, 3);
       if (struct.isSetAcknowledged_by()) {
         oprot.writeI32(struct.acknowledged_by);
       }
       if (struct.isSetAcknowledged_by_batchlog()) {
         oprot.writeBool(struct.acknowledged_by_batchlog);
       }
+      if (struct.isSetPaxos_in_progress()) {
+        oprot.writeBool(struct.paxos_in_progress);
+      }
     }
 
     @Override
     public void read(org.apache.thrift.protocol.TProtocol prot, 
TimedOutException struct) throws org.apache.thrift.TException {
       TTupleProtocol iprot = (TTupleProtocol) prot;
-      BitSet incoming = iprot.readBitSet(2);
+      BitSet incoming = iprot.readBitSet(3);
       if (incoming.get(0)) {
         struct.acknowledged_by = iprot.readI32();
         struct.setAcknowledged_byIsSet(true);
@@ -542,6 +658,10 @@ public class TimedOutException extends TException 
implements org.apache.thrift.T
         struct.acknowledged_by_batchlog = iprot.readBool();
         struct.setAcknowledged_by_batchlogIsSet(true);
       }
+      if (incoming.get(2)) {
+        struct.paxos_in_progress = iprot.readBool();
+        struct.setPaxos_in_progressIsSet(true);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 133a8ff..7766f94 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -371,7 +371,7 @@ public abstract class ModificationStatement implements 
CQLStatement
         ColumnFamily updates = updateForKey(key, clusteringPrefix, params);
         ColumnFamily expected = buildConditions(key, clusteringPrefix, params);
 
-        boolean result = StorageProxy.cas(keyspace(), columnFamily(), key, 
expected, updates);
+        boolean result = StorageProxy.cas(keyspace(), columnFamily(), key, 
expected, updates, cl);
 
         ResultSet.Metadata metadata = new 
ResultSet.Metadata(Collections.singletonList(new 
ColumnSpecification(keyspace(), columnFamily(), RESULT_COLUMN, 
BooleanType.instance)));
         List<List<ByteBuffer>> newRows = 
Collections.singletonList(Collections.singletonList(BooleanType.instance.decompose(result)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java 
b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index aec3c2d..e62da1b 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -281,6 +281,19 @@ public enum ConsistencyLevel
         }
     }
 
+    public void validateForCas(String table) throws InvalidRequestException
+    {
+        switch (this)
+        {
+            case LOCAL_QUORUM:
+            case EACH_QUORUM:
+                requireNetworkTopologyStrategy(table);
+                break;
+            case ANY:
+                throw new InvalidRequestException("ANY is not supported with 
CAS. Use SERIAL if you mean, make sure it is accepted but I don't care how many 
replicas commit it for non-SERIAL reads");
+        }
+    }
+
     public void validateCounterForWrite(CFMetaData metadata) throws 
InvalidRequestException
     {
         if (this == ConsistencyLevel.ANY)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/src/java/org/apache/cassandra/db/WriteResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteResponse.java 
b/src/java/org/apache/cassandra/db/WriteResponse.java
index b89682c..83b579a 100644
--- a/src/java/org/apache/cassandra/db/WriteResponse.java
+++ b/src/java/org/apache/cassandra/db/WriteResponse.java
@@ -26,9 +26,7 @@ import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 
 /*
- * This message is sent back the row mutation verb handler
- * and basically specifies if the write succeeded or not for a particular
- * key in a table
+ * This empty response is sent by a replica to inform the coordinator that the 
write succeeded
  */
 public class WriteResponse
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 829fc0a..7e288ab 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -195,9 +195,11 @@ public class StorageProxy implements StorageProxyMBean
      *
      * @return true if the operation succeeds in updating the row
      */
-    public static boolean cas(String table, String cfName, ByteBuffer key, 
ColumnFamily expected, ColumnFamily updates)
+    public static boolean cas(String table, String cfName, ByteBuffer key, 
ColumnFamily expected, ColumnFamily updates, ConsistencyLevel consistencyLevel)
     throws UnavailableException, IsBootstrappingException, 
ReadTimeoutException, WriteTimeoutException, InvalidRequestException
     {
+        consistencyLevel.validateForCas(table);
+
         CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
 
         long start = System.nanoTime();
@@ -232,7 +234,10 @@ public class StorageProxy implements StorageProxyMBean
             Tracing.trace("CAS precondition is met; proposing client-requested 
updates for {}", ballot);
             if (proposePaxos(proposal, liveEndpoints, requiredParticipants))
             {
-                commitPaxos(proposal, liveEndpoints);
+                if (consistencyLevel == ConsistencyLevel.SERIAL)
+                    sendCommit(proposal, liveEndpoints);
+                else
+                    commitPaxos(proposal, consistencyLevel);
                 Tracing.trace("CAS successful");
                 return true;
             }
@@ -322,8 +327,17 @@ public class StorageProxy implements StorageProxyMBean
         {
             Tracing.trace("Finishing incomplete paxos round {}", inProgress);
             if (proposePaxos(inProgress, liveEndpoints, requiredParticipants))
-                commitPaxos(inProgress, liveEndpoints);
-            // no need to sleep here
+            {
+                try
+                {
+                    commitPaxos(inProgress, ConsistencyLevel.QUORUM);
+                }
+                catch (WriteTimeoutException e)
+                {
+                    // let caller retry or turn it into a cas timeout, since 
it's someone elses' write we're applying
+                    return null;
+                }
+            }
             return null;
         }
 
@@ -335,7 +349,7 @@ public class StorageProxy implements StorageProxyMBean
         if (Iterables.size(missingMRC) > 0)
         {
             Tracing.trace("Repairing replicas that missed the most recent 
commit");
-            commitPaxos(mostRecent, missingMRC);
+            sendCommit(mostRecent, missingMRC);
             // TODO: provided commits don't invalid the prepare we just did 
above (which they don't), we could just wait
             // for all the missingMRC to acknowledge this commit and then move 
on with proposing our value. But that means
             // adding the ability to have commitPaxos block, which is exactly 
CASSANDRA-5442 will do. So once we have that
@@ -346,6 +360,16 @@ public class StorageProxy implements StorageProxyMBean
         return ballot;
     }
 
+    /**
+     * Unlike commitPaxos, this does not wait for replies
+     */
+    private static void sendCommit(Commit commit, Iterable<InetAddress> 
replicas)
+    {
+        MessageOut<Commit> message = new 
MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, commit, 
Commit.serializer);
+        for (InetAddress target : replicas)
+            MessagingService.instance().sendOneWay(message, target);
+    }
+
     private static PrepareCallback preparePaxos(Commit toPrepare, 
List<InetAddress> endpoints, int requiredParticipants)
     throws WriteTimeoutException
     {
@@ -369,11 +393,25 @@ public class StorageProxy implements StorageProxyMBean
         return callback.getSuccessful() >= requiredParticipants;
     }
 
-    private static void commitPaxos(Commit proposal, Iterable<InetAddress> 
endpoints)
+    private static void commitPaxos(Commit proposal, ConsistencyLevel 
consistencyLevel) throws WriteTimeoutException
     {
+        Table table = Table.open(proposal.update.metadata().ksName);
+
+        Token tk = StorageService.getPartitioner().getToken(proposal.key);
+        List<InetAddress> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(table.getName(), tk);
+        Collection<InetAddress> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
table.getName());
+
+        AbstractReplicationStrategy rs = table.getReplicationStrategy();
+        AbstractWriteResponseHandler responseHandler = 
rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, 
consistencyLevel, null, WriteType.SIMPLE);
+
         MessageOut<Commit> message = new 
MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, 
Commit.serializer);
-        for (InetAddress target : endpoints)
-            MessagingService.instance().sendOneWay(message, target);
+        for (InetAddress destination : Iterables.concat(naturalEndpoints, 
pendingEndpoints))
+        {
+            if (FailureDetector.instance.isAlive(destination))
+                MessagingService.instance().sendRR(message, destination, 
responseHandler);
+        }
+
+        responseHandler.get();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java 
b/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java
index d7944b6..190d654 100644
--- a/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java
@@ -1,12 +1,19 @@
 package org.apache.cassandra.service.paxos;
 
+import org.apache.cassandra.db.WriteResponse;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tracing.Tracing;
 
 public class CommitVerbHandler implements IVerbHandler<Commit>
 {
     public void doVerb(MessageIn<Commit> message, int id)
     {
         PaxosState.commit(message.payload);
+
+        WriteResponse response = new WriteResponse();
+        Tracing.trace("Enqueuing acknowledge to {}", message.from);
+        MessagingService.instance().sendReply(response.createMessage(), id, 
message.from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java 
b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 4c188f7..870cdbd 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -724,7 +724,7 @@ public class CassandraServer implements Cassandra.Iface
         }
     }
 
-    public boolean cas(ByteBuffer key, String column_family, List<Column> 
expected, List<Column> updates)
+    public boolean cas(ByteBuffer key, String column_family, List<Column> 
expected, List<Column> updates, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
         if (startSessionIfRequested())
@@ -780,7 +780,7 @@ public class CassandraServer implements Cassandra.Iface
             }
 
             schedule(DatabaseDescriptor.getWriteRpcTimeout());
-            return StorageProxy.cas(cState.getKeyspace(), column_family, key, 
cfExpected, cfUpdates);
+            return StorageProxy.cas(cState.getKeyspace(), column_family, key, 
cfExpected, cfUpdates, ThriftConversion.fromThrift(consistency_level));
         }
         catch (RequestTimeoutException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java 
b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index 0642129..28725f0 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -84,6 +84,8 @@ public class ThriftConversion
                 toe.setAcknowledged_by_batchlog(false);
             else if (wte.writeType == WriteType.BATCH)
                 toe.setAcknowledged_by_batchlog(true);
+            else if (wte.writeType == WriteType.CAS)
+                toe.setPaxos_in_progress(true);
         }
         return toe;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/test/system/test_thrift_server.py
----------------------------------------------------------------------
diff --git a/test/system/test_thrift_server.py 
b/test/system/test_thrift_server.py
index df224e3..6161fe0 100644
--- a/test/system/test_thrift_server.py
+++ b/test/system/test_thrift_server.py
@@ -232,15 +232,18 @@ class TestMutations(ThriftTester):
 
     def test_cas(self):
         _set_keyspace('Keyspace1')
-        assert not client.cas('key1', 'Standard1', _SIMPLE_COLUMNS, 
_SIMPLE_COLUMNS)
+        def cas(expected, updates):
+            return client.cas('key1', 'Standard1', expected, updates, 
ConsistencyLevel.ONE)
 
-        assert client.cas('key1', 'Standard1', None, _SIMPLE_COLUMNS)
+        assert not cas(_SIMPLE_COLUMNS, _SIMPLE_COLUMNS)
+
+        assert cas(None, _SIMPLE_COLUMNS)
 
         result = [cosc.column for cosc in _big_slice('key1', 
ColumnParent('Standard1'))]
         # CAS will use its own timestamp, so we can't just compare result == 
_SIMPLE_COLUMNS
         assert dict((c.name, c.value) for c in result) == dict((c.name, 
c.value) for c in _SIMPLE_COLUMNS), result
 
-        assert not client.cas('key1', 'Standard1', None, _SIMPLE_COLUMNS)
+        assert not cas(None, _SIMPLE_COLUMNS)
 
         # CL.SERIAL for reads
         assert client.get('key1', ColumnPath('Standard1', column='c1'), 
ConsistencyLevel.SERIAL).column.value == 'value1'

Reply via email to