Updated Branches: refs/heads/trunk 0387cf587 -> bc3597d35
Add support for specifying CAS commit ConsistencyLevel patch by jbellis; reviewed by aleksey for CASSANDRA-5442 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bc3597d3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bc3597d3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bc3597d3 Branch: refs/heads/trunk Commit: bc3597d3549850997fd137cc8b74700c62cebf64 Parents: 0387cf5 Author: Jonathan Ellis <jbel...@apache.org> Authored: Mon May 27 14:31:44 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Mon May 27 14:31:44 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 +- interface/cassandra.thrift | 14 +- .../org/apache/cassandra/thrift/Cassandra.java | 147 +++++++++++++- .../apache/cassandra/thrift/TimedOutException.java | 136 +++++++++++++- .../cql3/statements/ModificationStatement.java | 2 +- .../org/apache/cassandra/db/ConsistencyLevel.java | 13 ++ .../org/apache/cassandra/db/WriteResponse.java | 4 +- .../org/apache/cassandra/service/StorageProxy.java | 54 +++++- .../cassandra/service/paxos/CommitVerbHandler.java | 7 + .../apache/cassandra/thrift/CassandraServer.java | 4 +- .../apache/cassandra/thrift/ThriftConversion.java | 2 + test/system/test_thrift_server.py | 9 +- 12 files changed, 354 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0ce9e63..e233ba0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,7 +9,7 @@ * Removed compatibility with pre-1.2.5 sstables and network messages (CASSANDRA-5511) * removed PBSPredictor (CASSANDRA-5455) - * CAS support (CASSANDRA-5062, 5441, 5443) + * CAS support (CASSANDRA-5062, 5441, 5442, 5443) * Leveled compaction performs size-tiered compactions in L0 (CASSANDRA-5371, 5439) * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/interface/cassandra.thrift ---------------------------------------------------------------------- diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift index 1e78d51..cfdaf88 100644 --- a/interface/cassandra.thrift +++ b/interface/cassandra.thrift @@ -148,10 +148,17 @@ exception TimedOutException { */ 1: optional i32 acknowledged_by - /** - * in case of atomic_batch_mutate method this field tells if the batch was written to the batchlog. + /** + * in case of atomic_batch_mutate method this field tells if the batch + * was written to the batchlog. */ 2: optional bool acknowledged_by_batchlog + + /** + * for the CAS method, this field tells if we timed out during the paxos + * protocol, as opposed to during the commit of our update + */ + 3: optional bool paxos_in_progress } /** invalid authentication request (invalid keyspace, user does not exist, or credentials invalid) */ @@ -643,7 +650,8 @@ service Cassandra { bool cas(1:required binary key, 2:required string column_family, 3:list<Column> expected, - 4:list<Column> updates) + 4:list<Column> updates, + 5:required ConsistencyLevel consistency_level=ConsistencyLevel.QUORUM) throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ---------------------------------------------------------------------- diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java index 34aec98..a2761ca 100644 --- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java +++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java @@ -170,8 +170,9 @@ public class Cassandra { * @param column_family * @param expected * @param updates + * @param consistency_level */ - public boolean cas(ByteBuffer key, String column_family, List<Column> expected, List<Column> updates) throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException; + public boolean cas(ByteBuffer key, String column_family, List<Column> expected, List<Column> updates, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException; /** * Remove data from the row specified by key at the granularity specified by column_path, and the given timestamp. Note @@ -424,7 +425,7 @@ public class Cassandra { public void add(ByteBuffer key, ColumnParent column_parent, CounterColumn column, ConsistencyLevel consistency_level, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.add_call> resultHandler) throws org.apache.thrift.TException; - public void cas(ByteBuffer key, String column_family, List<Column> expected, List<Column> updates, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.cas_call> resultHandler) throws org.apache.thrift.TException; + public void cas(ByteBuffer key, String column_family, List<Column> expected, List<Column> updates, ConsistencyLevel consistency_level, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.cas_call> resultHandler) throws org.apache.thrift.TException; public void remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.remove_call> resultHandler) throws org.apache.thrift.TException; @@ -903,19 +904,20 @@ public class Cassandra { return; } - public boolean cas(ByteBuffer key, String column_family, List<Column> expected, List<Column> updates) throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException + public boolean cas(ByteBuffer key, String column_family, List<Column> expected, List<Column> updates, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException { - send_cas(key, column_family, expected, updates); + send_cas(key, column_family, expected, updates, consistency_level); return recv_cas(); } - public void send_cas(ByteBuffer key, String column_family, List<Column> expected, List<Column> updates) throws org.apache.thrift.TException + public void send_cas(ByteBuffer key, String column_family, List<Column> expected, List<Column> updates, ConsistencyLevel consistency_level) throws org.apache.thrift.TException { cas_args args = new cas_args(); args.setKey(key); args.setColumn_family(column_family); args.setExpected(expected); args.setUpdates(updates); + args.setConsistency_level(consistency_level); sendBase("cas", args); } @@ -2274,9 +2276,9 @@ public class Cassandra { } } - public void cas(ByteBuffer key, String column_family, List<Column> expected, List<Column> updates, org.apache.thrift.async.AsyncMethodCallback<cas_call> resultHandler) throws org.apache.thrift.TException { + public void cas(ByteBuffer key, String column_family, List<Column> expected, List<Column> updates, ConsistencyLevel consistency_level, org.apache.thrift.async.AsyncMethodCallback<cas_call> resultHandler) throws org.apache.thrift.TException { checkReady(); - cas_call method_call = new cas_call(key, column_family, expected, updates, resultHandler, this, ___protocolFactory, ___transport); + cas_call method_call = new cas_call(key, column_family, expected, updates, consistency_level, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; ___manager.call(method_call); } @@ -2286,12 +2288,14 @@ public class Cassandra { private String column_family; private List<Column> expected; private List<Column> updates; - public cas_call(ByteBuffer key, String column_family, List<Column> expected, List<Column> updates, org.apache.thrift.async.AsyncMethodCallback<cas_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + private ConsistencyLevel consistency_level; + public cas_call(ByteBuffer key, String column_family, List<Column> expected, List<Column> updates, ConsistencyLevel consistency_level, org.apache.thrift.async.AsyncMethodCallback<cas_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { super(client, protocolFactory, transport, resultHandler, false); this.key = key; this.column_family = column_family; this.expected = expected; this.updates = updates; + this.consistency_level = consistency_level; } public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { @@ -2301,6 +2305,7 @@ public class Cassandra { args.setColumn_family(column_family); args.setExpected(expected); args.setUpdates(updates); + args.setConsistency_level(consistency_level); args.write(prot); prot.writeMessageEnd(); } @@ -3722,7 +3727,7 @@ public class Cassandra { public cas_result getResult(I iface, cas_args args) throws org.apache.thrift.TException { cas_result result = new cas_result(); try { - result.success = iface.cas(args.key, args.column_family, args.expected, args.updates); + result.success = iface.cas(args.key, args.column_family, args.expected, args.updates, args.consistency_level); result.setSuccessIsSet(true); } catch (InvalidRequestException ire) { result.ire = ire; @@ -20085,6 +20090,7 @@ public class Cassandra { private static final org.apache.thrift.protocol.TField COLUMN_FAMILY_FIELD_DESC = new org.apache.thrift.protocol.TField("column_family", org.apache.thrift.protocol.TType.STRING, (short)2); private static final org.apache.thrift.protocol.TField EXPECTED_FIELD_DESC = new org.apache.thrift.protocol.TField("expected", org.apache.thrift.protocol.TType.LIST, (short)3); private static final org.apache.thrift.protocol.TField UPDATES_FIELD_DESC = new org.apache.thrift.protocol.TField("updates", org.apache.thrift.protocol.TType.LIST, (short)4); + private static final org.apache.thrift.protocol.TField CONSISTENCY_LEVEL_FIELD_DESC = new org.apache.thrift.protocol.TField("consistency_level", org.apache.thrift.protocol.TType.I32, (short)5); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -20096,13 +20102,23 @@ public class Cassandra { public String column_family; // required public List<Column> expected; // required public List<Column> updates; // required + /** + * + * @see ConsistencyLevel + */ + public ConsistencyLevel consistency_level; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { KEY((short)1, "key"), COLUMN_FAMILY((short)2, "column_family"), EXPECTED((short)3, "expected"), - UPDATES((short)4, "updates"); + UPDATES((short)4, "updates"), + /** + * + * @see ConsistencyLevel + */ + CONSISTENCY_LEVEL((short)5, "consistency_level"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -20125,6 +20141,8 @@ public class Cassandra { return EXPECTED; case 4: // UPDATES return UPDATES; + case 5: // CONSISTENCY_LEVEL + return CONSISTENCY_LEVEL; default: return null; } @@ -20178,24 +20196,30 @@ public class Cassandra { tmpMap.put(_Fields.UPDATES, new org.apache.thrift.meta_data.FieldMetaData("updates", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, Column.class)))); + tmpMap.put(_Fields.CONSISTENCY_LEVEL, new org.apache.thrift.meta_data.FieldMetaData("consistency_level", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, ConsistencyLevel.class))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(cas_args.class, metaDataMap); } public cas_args() { + this.consistency_level = org.apache.cassandra.thrift.ConsistencyLevel.QUORUM; + } public cas_args( ByteBuffer key, String column_family, List<Column> expected, - List<Column> updates) + List<Column> updates, + ConsistencyLevel consistency_level) { this(); this.key = key; this.column_family = column_family; this.expected = expected; this.updates = updates; + this.consistency_level = consistency_level; } /** @@ -20223,6 +20247,9 @@ public class Cassandra { } this.updates = __this__updates; } + if (other.isSetConsistency_level()) { + this.consistency_level = other.consistency_level; + } } public cas_args deepCopy() { @@ -20235,6 +20262,8 @@ public class Cassandra { this.column_family = null; this.expected = null; this.updates = null; + this.consistency_level = org.apache.cassandra.thrift.ConsistencyLevel.QUORUM; + } public byte[] getKey() { @@ -20373,6 +20402,38 @@ public class Cassandra { } } + /** + * + * @see ConsistencyLevel + */ + public ConsistencyLevel getConsistency_level() { + return this.consistency_level; + } + + /** + * + * @see ConsistencyLevel + */ + public cas_args setConsistency_level(ConsistencyLevel consistency_level) { + this.consistency_level = consistency_level; + return this; + } + + public void unsetConsistency_level() { + this.consistency_level = null; + } + + /** Returns true if field consistency_level is set (has been assigned a value) and false otherwise */ + public boolean isSetConsistency_level() { + return this.consistency_level != null; + } + + public void setConsistency_levelIsSet(boolean value) { + if (!value) { + this.consistency_level = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case KEY: @@ -20407,6 +20468,14 @@ public class Cassandra { } break; + case CONSISTENCY_LEVEL: + if (value == null) { + unsetConsistency_level(); + } else { + setConsistency_level((ConsistencyLevel)value); + } + break; + } } @@ -20424,6 +20493,9 @@ public class Cassandra { case UPDATES: return getUpdates(); + case CONSISTENCY_LEVEL: + return getConsistency_level(); + } throw new IllegalStateException(); } @@ -20443,6 +20515,8 @@ public class Cassandra { return isSetExpected(); case UPDATES: return isSetUpdates(); + case CONSISTENCY_LEVEL: + return isSetConsistency_level(); } throw new IllegalStateException(); } @@ -20496,6 +20570,15 @@ public class Cassandra { return false; } + boolean this_present_consistency_level = true && this.isSetConsistency_level(); + boolean that_present_consistency_level = true && that.isSetConsistency_level(); + if (this_present_consistency_level || that_present_consistency_level) { + if (!(this_present_consistency_level && that_present_consistency_level)) + return false; + if (!this.consistency_level.equals(that.consistency_level)) + return false; + } + return true; } @@ -20523,6 +20606,11 @@ public class Cassandra { if (present_updates) builder.append(updates); + boolean present_consistency_level = true && (isSetConsistency_level()); + builder.append(present_consistency_level); + if (present_consistency_level) + builder.append(consistency_level.getValue()); + return builder.toHashCode(); } @@ -20574,6 +20662,16 @@ public class Cassandra { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetConsistency_level()).compareTo(typedOther.isSetConsistency_level()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetConsistency_level()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.consistency_level, typedOther.consistency_level); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -20625,6 +20723,14 @@ public class Cassandra { sb.append(this.updates); } first = false; + if (!first) sb.append(", "); + sb.append("consistency_level:"); + if (this.consistency_level == null) { + sb.append("null"); + } else { + sb.append(this.consistency_level); + } + first = false; sb.append(")"); return sb.toString(); } @@ -20637,6 +20743,9 @@ public class Cassandra { if (column_family == null) { throw new org.apache.thrift.protocol.TProtocolException("Required field 'column_family' was not present! Struct: " + toString()); } + if (consistency_level == null) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'consistency_level' was not present! Struct: " + toString()); + } // check for sub-struct validity } @@ -20728,6 +20837,14 @@ public class Cassandra { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 5: // CONSISTENCY_LEVEL + if (schemeField.type == org.apache.thrift.protocol.TType.I32) { + struct.consistency_level = ConsistencyLevel.findByValue(iprot.readI32()); + struct.setConsistency_levelIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -20777,6 +20894,11 @@ public class Cassandra { } oprot.writeFieldEnd(); } + if (struct.consistency_level != null) { + oprot.writeFieldBegin(CONSISTENCY_LEVEL_FIELD_DESC); + oprot.writeI32(struct.consistency_level.getValue()); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -20796,6 +20918,7 @@ public class Cassandra { TTupleProtocol oprot = (TTupleProtocol) prot; oprot.writeBinary(struct.key); oprot.writeString(struct.column_family); + oprot.writeI32(struct.consistency_level.getValue()); BitSet optionals = new BitSet(); if (struct.isSetExpected()) { optionals.set(0); @@ -20831,6 +20954,8 @@ public class Cassandra { struct.setKeyIsSet(true); struct.column_family = iprot.readString(); struct.setColumn_familyIsSet(true); + struct.consistency_level = ConsistencyLevel.findByValue(iprot.readI32()); + struct.setConsistency_levelIsSet(true); BitSet incoming = iprot.readBitSet(2); if (incoming.get(0)) { { http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java ---------------------------------------------------------------------- diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java index 3112c98..24662a7 100644 --- a/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java +++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java @@ -60,6 +60,7 @@ public class TimedOutException extends TException implements org.apache.thrift.T private static final org.apache.thrift.protocol.TField ACKNOWLEDGED_BY_FIELD_DESC = new org.apache.thrift.protocol.TField("acknowledged_by", org.apache.thrift.protocol.TType.I32, (short)1); private static final org.apache.thrift.protocol.TField ACKNOWLEDGED_BY_BATCHLOG_FIELD_DESC = new org.apache.thrift.protocol.TField("acknowledged_by_batchlog", org.apache.thrift.protocol.TType.BOOL, (short)2); + private static final org.apache.thrift.protocol.TField PAXOS_IN_PROGRESS_FIELD_DESC = new org.apache.thrift.protocol.TField("paxos_in_progress", org.apache.thrift.protocol.TType.BOOL, (short)3); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -75,9 +76,15 @@ public class TimedOutException extends TException implements org.apache.thrift.T */ public int acknowledged_by; // optional /** - * in case of atomic_batch_mutate method this field tells if the batch was written to the batchlog. + * in case of atomic_batch_mutate method this field tells if the batch + * was written to the batchlog. */ public boolean acknowledged_by_batchlog; // optional + /** + * for the CAS method, this field tells if we timed out during the paxos + * protocol, as opposed to during the commit of our update + */ + public boolean paxos_in_progress; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -89,9 +96,15 @@ public class TimedOutException extends TException implements org.apache.thrift.T */ ACKNOWLEDGED_BY((short)1, "acknowledged_by"), /** - * in case of atomic_batch_mutate method this field tells if the batch was written to the batchlog. + * in case of atomic_batch_mutate method this field tells if the batch + * was written to the batchlog. + */ + ACKNOWLEDGED_BY_BATCHLOG((short)2, "acknowledged_by_batchlog"), + /** + * for the CAS method, this field tells if we timed out during the paxos + * protocol, as opposed to during the commit of our update */ - ACKNOWLEDGED_BY_BATCHLOG((short)2, "acknowledged_by_batchlog"); + PAXOS_IN_PROGRESS((short)3, "paxos_in_progress"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -110,6 +123,8 @@ public class TimedOutException extends TException implements org.apache.thrift.T return ACKNOWLEDGED_BY; case 2: // ACKNOWLEDGED_BY_BATCHLOG return ACKNOWLEDGED_BY_BATCHLOG; + case 3: // PAXOS_IN_PROGRESS + return PAXOS_IN_PROGRESS; default: return null; } @@ -152,8 +167,9 @@ public class TimedOutException extends TException implements org.apache.thrift.T // isset id assignments private static final int __ACKNOWLEDGED_BY_ISSET_ID = 0; private static final int __ACKNOWLEDGED_BY_BATCHLOG_ISSET_ID = 1; + private static final int __PAXOS_IN_PROGRESS_ISSET_ID = 2; private byte __isset_bitfield = 0; - private _Fields optionals[] = {_Fields.ACKNOWLEDGED_BY,_Fields.ACKNOWLEDGED_BY_BATCHLOG}; + private _Fields optionals[] = {_Fields.ACKNOWLEDGED_BY,_Fields.ACKNOWLEDGED_BY_BATCHLOG,_Fields.PAXOS_IN_PROGRESS}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -161,6 +177,8 @@ public class TimedOutException extends TException implements org.apache.thrift.T new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); tmpMap.put(_Fields.ACKNOWLEDGED_BY_BATCHLOG, new org.apache.thrift.meta_data.FieldMetaData("acknowledged_by_batchlog", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); + tmpMap.put(_Fields.PAXOS_IN_PROGRESS, new org.apache.thrift.meta_data.FieldMetaData("paxos_in_progress", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TimedOutException.class, metaDataMap); } @@ -175,6 +193,7 @@ public class TimedOutException extends TException implements org.apache.thrift.T __isset_bitfield = other.__isset_bitfield; this.acknowledged_by = other.acknowledged_by; this.acknowledged_by_batchlog = other.acknowledged_by_batchlog; + this.paxos_in_progress = other.paxos_in_progress; } public TimedOutException deepCopy() { @@ -187,6 +206,8 @@ public class TimedOutException extends TException implements org.apache.thrift.T this.acknowledged_by = 0; setAcknowledged_by_batchlogIsSet(false); this.acknowledged_by_batchlog = false; + setPaxos_in_progressIsSet(false); + this.paxos_in_progress = false; } /** @@ -225,14 +246,16 @@ public class TimedOutException extends TException implements org.apache.thrift.T } /** - * in case of atomic_batch_mutate method this field tells if the batch was written to the batchlog. + * in case of atomic_batch_mutate method this field tells if the batch + * was written to the batchlog. */ public boolean isAcknowledged_by_batchlog() { return this.acknowledged_by_batchlog; } /** - * in case of atomic_batch_mutate method this field tells if the batch was written to the batchlog. + * in case of atomic_batch_mutate method this field tells if the batch + * was written to the batchlog. */ public TimedOutException setAcknowledged_by_batchlog(boolean acknowledged_by_batchlog) { this.acknowledged_by_batchlog = acknowledged_by_batchlog; @@ -253,6 +276,37 @@ public class TimedOutException extends TException implements org.apache.thrift.T __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __ACKNOWLEDGED_BY_BATCHLOG_ISSET_ID, value); } + /** + * for the CAS method, this field tells if we timed out during the paxos + * protocol, as opposed to during the commit of our update + */ + public boolean isPaxos_in_progress() { + return this.paxos_in_progress; + } + + /** + * for the CAS method, this field tells if we timed out during the paxos + * protocol, as opposed to during the commit of our update + */ + public TimedOutException setPaxos_in_progress(boolean paxos_in_progress) { + this.paxos_in_progress = paxos_in_progress; + setPaxos_in_progressIsSet(true); + return this; + } + + public void unsetPaxos_in_progress() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __PAXOS_IN_PROGRESS_ISSET_ID); + } + + /** Returns true if field paxos_in_progress is set (has been assigned a value) and false otherwise */ + public boolean isSetPaxos_in_progress() { + return EncodingUtils.testBit(__isset_bitfield, __PAXOS_IN_PROGRESS_ISSET_ID); + } + + public void setPaxos_in_progressIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __PAXOS_IN_PROGRESS_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case ACKNOWLEDGED_BY: @@ -271,6 +325,14 @@ public class TimedOutException extends TException implements org.apache.thrift.T } break; + case PAXOS_IN_PROGRESS: + if (value == null) { + unsetPaxos_in_progress(); + } else { + setPaxos_in_progress((Boolean)value); + } + break; + } } @@ -282,6 +344,9 @@ public class TimedOutException extends TException implements org.apache.thrift.T case ACKNOWLEDGED_BY_BATCHLOG: return Boolean.valueOf(isAcknowledged_by_batchlog()); + case PAXOS_IN_PROGRESS: + return Boolean.valueOf(isPaxos_in_progress()); + } throw new IllegalStateException(); } @@ -297,6 +362,8 @@ public class TimedOutException extends TException implements org.apache.thrift.T return isSetAcknowledged_by(); case ACKNOWLEDGED_BY_BATCHLOG: return isSetAcknowledged_by_batchlog(); + case PAXOS_IN_PROGRESS: + return isSetPaxos_in_progress(); } throw new IllegalStateException(); } @@ -332,6 +399,15 @@ public class TimedOutException extends TException implements org.apache.thrift.T return false; } + boolean this_present_paxos_in_progress = true && this.isSetPaxos_in_progress(); + boolean that_present_paxos_in_progress = true && that.isSetPaxos_in_progress(); + if (this_present_paxos_in_progress || that_present_paxos_in_progress) { + if (!(this_present_paxos_in_progress && that_present_paxos_in_progress)) + return false; + if (this.paxos_in_progress != that.paxos_in_progress) + return false; + } + return true; } @@ -349,6 +425,11 @@ public class TimedOutException extends TException implements org.apache.thrift.T if (present_acknowledged_by_batchlog) builder.append(acknowledged_by_batchlog); + boolean present_paxos_in_progress = true && (isSetPaxos_in_progress()); + builder.append(present_paxos_in_progress); + if (present_paxos_in_progress) + builder.append(paxos_in_progress); + return builder.toHashCode(); } @@ -380,6 +461,16 @@ public class TimedOutException extends TException implements org.apache.thrift.T return lastComparison; } } + lastComparison = Boolean.valueOf(isSetPaxos_in_progress()).compareTo(typedOther.isSetPaxos_in_progress()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetPaxos_in_progress()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.paxos_in_progress, typedOther.paxos_in_progress); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -411,6 +502,12 @@ public class TimedOutException extends TException implements org.apache.thrift.T sb.append(this.acknowledged_by_batchlog); first = false; } + if (isSetPaxos_in_progress()) { + if (!first) sb.append(", "); + sb.append("paxos_in_progress:"); + sb.append(this.paxos_in_progress); + first = false; + } sb.append(")"); return sb.toString(); } @@ -472,6 +569,14 @@ public class TimedOutException extends TException implements org.apache.thrift.T org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 3: // PAXOS_IN_PROGRESS + if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { + struct.paxos_in_progress = iprot.readBool(); + struct.setPaxos_in_progressIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -497,6 +602,11 @@ public class TimedOutException extends TException implements org.apache.thrift.T oprot.writeBool(struct.acknowledged_by_batchlog); oprot.writeFieldEnd(); } + if (struct.isSetPaxos_in_progress()) { + oprot.writeFieldBegin(PAXOS_IN_PROGRESS_FIELD_DESC); + oprot.writeBool(struct.paxos_in_progress); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -521,19 +631,25 @@ public class TimedOutException extends TException implements org.apache.thrift.T if (struct.isSetAcknowledged_by_batchlog()) { optionals.set(1); } - oprot.writeBitSet(optionals, 2); + if (struct.isSetPaxos_in_progress()) { + optionals.set(2); + } + oprot.writeBitSet(optionals, 3); if (struct.isSetAcknowledged_by()) { oprot.writeI32(struct.acknowledged_by); } if (struct.isSetAcknowledged_by_batchlog()) { oprot.writeBool(struct.acknowledged_by_batchlog); } + if (struct.isSetPaxos_in_progress()) { + oprot.writeBool(struct.paxos_in_progress); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, TimedOutException struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; - BitSet incoming = iprot.readBitSet(2); + BitSet incoming = iprot.readBitSet(3); if (incoming.get(0)) { struct.acknowledged_by = iprot.readI32(); struct.setAcknowledged_byIsSet(true); @@ -542,6 +658,10 @@ public class TimedOutException extends TException implements org.apache.thrift.T struct.acknowledged_by_batchlog = iprot.readBool(); struct.setAcknowledged_by_batchlogIsSet(true); } + if (incoming.get(2)) { + struct.paxos_in_progress = iprot.readBool(); + struct.setPaxos_in_progressIsSet(true); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 133a8ff..7766f94 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -371,7 +371,7 @@ public abstract class ModificationStatement implements CQLStatement ColumnFamily updates = updateForKey(key, clusteringPrefix, params); ColumnFamily expected = buildConditions(key, clusteringPrefix, params); - boolean result = StorageProxy.cas(keyspace(), columnFamily(), key, expected, updates); + boolean result = StorageProxy.cas(keyspace(), columnFamily(), key, expected, updates, cl); ResultSet.Metadata metadata = new ResultSet.Metadata(Collections.singletonList(new ColumnSpecification(keyspace(), columnFamily(), RESULT_COLUMN, BooleanType.instance))); List<List<ByteBuffer>> newRows = Collections.singletonList(Collections.singletonList(BooleanType.instance.decompose(result))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/src/java/org/apache/cassandra/db/ConsistencyLevel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java index aec3c2d..e62da1b 100644 --- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java +++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java @@ -281,6 +281,19 @@ public enum ConsistencyLevel } } + public void validateForCas(String table) throws InvalidRequestException + { + switch (this) + { + case LOCAL_QUORUM: + case EACH_QUORUM: + requireNetworkTopologyStrategy(table); + break; + case ANY: + throw new InvalidRequestException("ANY is not supported with CAS. Use SERIAL if you mean, make sure it is accepted but I don't care how many replicas commit it for non-SERIAL reads"); + } + } + public void validateCounterForWrite(CFMetaData metadata) throws InvalidRequestException { if (this == ConsistencyLevel.ANY) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/src/java/org/apache/cassandra/db/WriteResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/WriteResponse.java b/src/java/org/apache/cassandra/db/WriteResponse.java index b89682c..83b579a 100644 --- a/src/java/org/apache/cassandra/db/WriteResponse.java +++ b/src/java/org/apache/cassandra/db/WriteResponse.java @@ -26,9 +26,7 @@ import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; /* - * This message is sent back the row mutation verb handler - * and basically specifies if the write succeeded or not for a particular - * key in a table + * This empty response is sent by a replica to inform the coordinator that the write succeeded */ public class WriteResponse { http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 829fc0a..7e288ab 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -195,9 +195,11 @@ public class StorageProxy implements StorageProxyMBean * * @return true if the operation succeeds in updating the row */ - public static boolean cas(String table, String cfName, ByteBuffer key, ColumnFamily expected, ColumnFamily updates) + public static boolean cas(String table, String cfName, ByteBuffer key, ColumnFamily expected, ColumnFamily updates, ConsistencyLevel consistencyLevel) throws UnavailableException, IsBootstrappingException, ReadTimeoutException, WriteTimeoutException, InvalidRequestException { + consistencyLevel.validateForCas(table); + CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName); long start = System.nanoTime(); @@ -232,7 +234,10 @@ public class StorageProxy implements StorageProxyMBean Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); if (proposePaxos(proposal, liveEndpoints, requiredParticipants)) { - commitPaxos(proposal, liveEndpoints); + if (consistencyLevel == ConsistencyLevel.SERIAL) + sendCommit(proposal, liveEndpoints); + else + commitPaxos(proposal, consistencyLevel); Tracing.trace("CAS successful"); return true; } @@ -322,8 +327,17 @@ public class StorageProxy implements StorageProxyMBean { Tracing.trace("Finishing incomplete paxos round {}", inProgress); if (proposePaxos(inProgress, liveEndpoints, requiredParticipants)) - commitPaxos(inProgress, liveEndpoints); - // no need to sleep here + { + try + { + commitPaxos(inProgress, ConsistencyLevel.QUORUM); + } + catch (WriteTimeoutException e) + { + // let caller retry or turn it into a cas timeout, since it's someone elses' write we're applying + return null; + } + } return null; } @@ -335,7 +349,7 @@ public class StorageProxy implements StorageProxyMBean if (Iterables.size(missingMRC) > 0) { Tracing.trace("Repairing replicas that missed the most recent commit"); - commitPaxos(mostRecent, missingMRC); + sendCommit(mostRecent, missingMRC); // TODO: provided commits don't invalid the prepare we just did above (which they don't), we could just wait // for all the missingMRC to acknowledge this commit and then move on with proposing our value. But that means // adding the ability to have commitPaxos block, which is exactly CASSANDRA-5442 will do. So once we have that @@ -346,6 +360,16 @@ public class StorageProxy implements StorageProxyMBean return ballot; } + /** + * Unlike commitPaxos, this does not wait for replies + */ + private static void sendCommit(Commit commit, Iterable<InetAddress> replicas) + { + MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, commit, Commit.serializer); + for (InetAddress target : replicas) + MessagingService.instance().sendOneWay(message, target); + } + private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddress> endpoints, int requiredParticipants) throws WriteTimeoutException { @@ -369,11 +393,25 @@ public class StorageProxy implements StorageProxyMBean return callback.getSuccessful() >= requiredParticipants; } - private static void commitPaxos(Commit proposal, Iterable<InetAddress> endpoints) + private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel) throws WriteTimeoutException { + Table table = Table.open(proposal.update.metadata().ksName); + + Token tk = StorageService.getPartitioner().getToken(proposal.key); + List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table.getName(), tk); + Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table.getName()); + + AbstractReplicationStrategy rs = table.getReplicationStrategy(); + AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE); + MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer); - for (InetAddress target : endpoints) - MessagingService.instance().sendOneWay(message, target); + for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints)) + { + if (FailureDetector.instance.isAlive(destination)) + MessagingService.instance().sendRR(message, destination, responseHandler); + } + + responseHandler.get(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java b/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java index d7944b6..190d654 100644 --- a/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java +++ b/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java @@ -1,12 +1,19 @@ package org.apache.cassandra.service.paxos; +import org.apache.cassandra.db.WriteResponse; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.tracing.Tracing; public class CommitVerbHandler implements IVerbHandler<Commit> { public void doVerb(MessageIn<Commit> message, int id) { PaxosState.commit(message.payload); + + WriteResponse response = new WriteResponse(); + Tracing.trace("Enqueuing acknowledge to {}", message.from); + MessagingService.instance().sendReply(response.createMessage(), id, message.from); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 4c188f7..870cdbd 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -724,7 +724,7 @@ public class CassandraServer implements Cassandra.Iface } } - public boolean cas(ByteBuffer key, String column_family, List<Column> expected, List<Column> updates) + public boolean cas(ByteBuffer key, String column_family, List<Column> expected, List<Column> updates, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException { if (startSessionIfRequested()) @@ -780,7 +780,7 @@ public class CassandraServer implements Cassandra.Iface } schedule(DatabaseDescriptor.getWriteRpcTimeout()); - return StorageProxy.cas(cState.getKeyspace(), column_family, key, cfExpected, cfUpdates); + return StorageProxy.cas(cState.getKeyspace(), column_family, key, cfExpected, cfUpdates, ThriftConversion.fromThrift(consistency_level)); } catch (RequestTimeoutException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/src/java/org/apache/cassandra/thrift/ThriftConversion.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java index 0642129..28725f0 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java +++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java @@ -84,6 +84,8 @@ public class ThriftConversion toe.setAcknowledged_by_batchlog(false); else if (wte.writeType == WriteType.BATCH) toe.setAcknowledged_by_batchlog(true); + else if (wte.writeType == WriteType.CAS) + toe.setPaxos_in_progress(true); } return toe; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc3597d3/test/system/test_thrift_server.py ---------------------------------------------------------------------- diff --git a/test/system/test_thrift_server.py b/test/system/test_thrift_server.py index df224e3..6161fe0 100644 --- a/test/system/test_thrift_server.py +++ b/test/system/test_thrift_server.py @@ -232,15 +232,18 @@ class TestMutations(ThriftTester): def test_cas(self): _set_keyspace('Keyspace1') - assert not client.cas('key1', 'Standard1', _SIMPLE_COLUMNS, _SIMPLE_COLUMNS) + def cas(expected, updates): + return client.cas('key1', 'Standard1', expected, updates, ConsistencyLevel.ONE) - assert client.cas('key1', 'Standard1', None, _SIMPLE_COLUMNS) + assert not cas(_SIMPLE_COLUMNS, _SIMPLE_COLUMNS) + + assert cas(None, _SIMPLE_COLUMNS) result = [cosc.column for cosc in _big_slice('key1', ColumnParent('Standard1'))] # CAS will use its own timestamp, so we can't just compare result == _SIMPLE_COLUMNS assert dict((c.name, c.value) for c in result) == dict((c.name, c.value) for c in _SIMPLE_COLUMNS), result - assert not client.cas('key1', 'Standard1', None, _SIMPLE_COLUMNS) + assert not cas(None, _SIMPLE_COLUMNS) # CL.SERIAL for reads assert client.get('key1', ColumnPath('Standard1', column='c1'), ConsistencyLevel.SERIAL).column.value == 'value1'