Author: gdusbabek Date: Thu May 20 17:13:17 2010 New Revision: 946717 URL: http://svn.apache.org/viewvc?rev=946717&view=rev Log: service call to check for schema agreement. patch by gdusbabek, reviewed by jbellis. CASSANDRA-1075
Added: cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java Modified: cassandra/trunk/interface/cassandra.thrift cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Modified: cassandra/trunk/interface/cassandra.thrift URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.thrift?rev=946717&r1=946716&r2=946717&view=diff ============================================================================== --- cassandra/trunk/interface/cassandra.thrift (original) +++ cassandra/trunk/interface/cassandra.thrift Thu May 20 17:13:17 2010 @@ -435,6 +435,14 @@ service Cassandra { // Meta-APIs -- APIs to get information about the node or cluster, // rather than user data. The nodeprobe program provides usage examples. + + /** + * ask the cluster if they all are using the same migration id. returns a map of version->hosts-on-that-version. + * hosts that did not respond will be under the key DatabaseDescriptor.INITIAL_VERSION. agreement can be determined + * by checking if the size of the map is 1. + */ + map<string, list<string>> check_schema_agreement() + throws (1: InvalidRequestException ire), /** list the defined keyspaces in this cluster */ set<string> describe_keyspaces(), Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java?rev=946717&r1=946716&r2=946717&view=diff ============================================================================== --- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (original) +++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java Thu May 20 17:13:17 2010 @@ -162,6 +162,13 @@ public class Cassandra { public void truncate(String keyspace, String cfname) throws InvalidRequestException, UnavailableException, TException; /** + * ask the cluster if they all are using the same migration id. returns a map of version->hosts-on-that-version. + * hosts that did not respond will be under the key DatabaseDescriptor.INITIAL_VERSION. agreement can be determined + * by checking if the size of the map is 1. + */ + public Map<String,List<String>> check_schema_agreement() throws InvalidRequestException, TException; + + /** * list the defined keyspaces in this cluster */ public Set<String> describe_keyspaces() throws TException; @@ -757,6 +764,41 @@ public class Cassandra { return; } + public Map<String,List<String>> check_schema_agreement() throws InvalidRequestException, TException + { + send_check_schema_agreement(); + return recv_check_schema_agreement(); + } + + public void send_check_schema_agreement() throws TException + { + oprot_.writeMessageBegin(new TMessage("check_schema_agreement", TMessageType.CALL, seqid_)); + check_schema_agreement_args args = new check_schema_agreement_args(); + args.write(oprot_); + oprot_.writeMessageEnd(); + oprot_.getTransport().flush(); + } + + public Map<String,List<String>> recv_check_schema_agreement() throws InvalidRequestException, TException + { + TMessage msg = iprot_.readMessageBegin(); + if (msg.type == TMessageType.EXCEPTION) { + TApplicationException x = TApplicationException.read(iprot_); + iprot_.readMessageEnd(); + throw x; + } + check_schema_agreement_result result = new check_schema_agreement_result(); + result.read(iprot_); + iprot_.readMessageEnd(); + if (result.isSetSuccess()) { + return result.success; + } + if (result.ire != null) { + throw result.ire; + } + throw new TApplicationException(TApplicationException.MISSING_RESULT, "check_schema_agreement failed: unknown result"); + } + public Set<String> describe_keyspaces() throws TException { send_describe_keyspaces(); @@ -1177,6 +1219,7 @@ public class Cassandra { processMap_.put("remove", new remove()); processMap_.put("batch_mutate", new batch_mutate()); processMap_.put("truncate", new truncate()); + processMap_.put("check_schema_agreement", new check_schema_agreement()); processMap_.put("describe_keyspaces", new describe_keyspaces()); processMap_.put("describe_cluster_name", new describe_cluster_name()); processMap_.put("describe_version", new describe_version()); @@ -1715,6 +1758,44 @@ public class Cassandra { } + private class check_schema_agreement implements ProcessFunction { + public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException + { + check_schema_agreement_args args = new check_schema_agreement_args(); + try { + args.read(iprot); + } catch (TProtocolException e) { + iprot.readMessageEnd(); + TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage()); + oprot.writeMessageBegin(new TMessage("check_schema_agreement", TMessageType.EXCEPTION, seqid)); + x.write(oprot); + oprot.writeMessageEnd(); + oprot.getTransport().flush(); + return; + } + iprot.readMessageEnd(); + check_schema_agreement_result result = new check_schema_agreement_result(); + try { + result.success = iface_.check_schema_agreement(); + } catch (InvalidRequestException ire) { + result.ire = ire; + } catch (Throwable th) { + LOGGER.error("Internal error processing check_schema_agreement", th); + TApplicationException x = new TApplicationException(TApplicationException.INTERNAL_ERROR, "Internal error processing check_schema_agreement"); + oprot.writeMessageBegin(new TMessage("check_schema_agreement", TMessageType.EXCEPTION, seqid)); + x.write(oprot); + oprot.writeMessageEnd(); + oprot.getTransport().flush(); + return; + } + oprot.writeMessageBegin(new TMessage("check_schema_agreement", TMessageType.REPLY, seqid)); + result.write(oprot); + oprot.writeMessageEnd(); + oprot.getTransport().flush(); + } + + } + private class describe_keyspaces implements ProcessFunction { public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException { @@ -13999,6 +14080,598 @@ public class Cassandra { } + public static class check_schema_agreement_args implements TBase<check_schema_agreement_args._Fields>, java.io.Serializable, Cloneable, Comparable<check_schema_agreement_args> { + private static final TStruct STRUCT_DESC = new TStruct("check_schema_agreement_args"); + + + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements TFieldIdEnum { +; + + private static final Map<Integer, _Fields> byId = new HashMap<Integer, _Fields>(); + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byId.put((int)field._thriftId, field); + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + return byId.get(fieldId); + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + public static final Map<_Fields, FieldMetaData> metaDataMap = Collections.unmodifiableMap(new EnumMap<_Fields, FieldMetaData>(_Fields.class) {{ + }}); + + static { + FieldMetaData.addStructMetaDataMap(check_schema_agreement_args.class, metaDataMap); + } + + public check_schema_agreement_args() { + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public check_schema_agreement_args(check_schema_agreement_args other) { + } + + public check_schema_agreement_args deepCopy() { + return new check_schema_agreement_args(this); + } + + @Deprecated + public check_schema_agreement_args clone() { + return new check_schema_agreement_args(this); + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + } + } + + public void setFieldValue(int fieldID, Object value) { + setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value); + } + + public Object getFieldValue(_Fields field) { + switch (field) { + } + throw new IllegalStateException(); + } + + public Object getFieldValue(int fieldId) { + return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId)); + } + + /** Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + switch (field) { + } + throw new IllegalStateException(); + } + + public boolean isSet(int fieldID) { + return isSet(_Fields.findByThriftIdOrThrow(fieldID)); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof check_schema_agreement_args) + return this.equals((check_schema_agreement_args)that); + return false; + } + + public boolean equals(check_schema_agreement_args that) { + if (that == null) + return false; + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + public int compareTo(check_schema_agreement_args other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + check_schema_agreement_args typedOther = (check_schema_agreement_args)other; + + return 0; + } + + public void read(TProtocol iprot) throws TException { + TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == TType.STOP) { + break; + } + switch (field.id) { + default: + TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + validate(); + } + + public void write(TProtocol oprot) throws TException { + validate(); + + oprot.writeStructBegin(STRUCT_DESC); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("check_schema_agreement_args("); + boolean first = true; + + sb.append(")"); + return sb.toString(); + } + + public void validate() throws TException { + // check for required fields + } + + } + + public static class check_schema_agreement_result implements TBase<check_schema_agreement_result._Fields>, java.io.Serializable, Cloneable { + private static final TStruct STRUCT_DESC = new TStruct("check_schema_agreement_result"); + + private static final TField SUCCESS_FIELD_DESC = new TField("success", TType.MAP, (short)0); + private static final TField IRE_FIELD_DESC = new TField("ire", TType.STRUCT, (short)1); + + public Map<String,List<String>> success; + public InvalidRequestException ire; + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements TFieldIdEnum { + SUCCESS((short)0, "success"), + IRE((short)1, "ire"); + + private static final Map<Integer, _Fields> byId = new HashMap<Integer, _Fields>(); + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byId.put((int)field._thriftId, field); + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + return byId.get(fieldId); + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + + public static final Map<_Fields, FieldMetaData> metaDataMap = Collections.unmodifiableMap(new EnumMap<_Fields, FieldMetaData>(_Fields.class) {{ + put(_Fields.SUCCESS, new FieldMetaData("success", TFieldRequirementType.DEFAULT, + new MapMetaData(TType.MAP, + new FieldValueMetaData(TType.STRING), + new ListMetaData(TType.LIST, + new FieldValueMetaData(TType.STRING))))); + put(_Fields.IRE, new FieldMetaData("ire", TFieldRequirementType.DEFAULT, + new FieldValueMetaData(TType.STRUCT))); + }}); + + static { + FieldMetaData.addStructMetaDataMap(check_schema_agreement_result.class, metaDataMap); + } + + public check_schema_agreement_result() { + } + + public check_schema_agreement_result( + Map<String,List<String>> success, + InvalidRequestException ire) + { + this(); + this.success = success; + this.ire = ire; + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public check_schema_agreement_result(check_schema_agreement_result other) { + if (other.isSetSuccess()) { + Map<String,List<String>> __this__success = new HashMap<String,List<String>>(); + for (Map.Entry<String, List<String>> other_element : other.success.entrySet()) { + + String other_element_key = other_element.getKey(); + List<String> other_element_value = other_element.getValue(); + + String __this__success_copy_key = other_element_key; + + List<String> __this__success_copy_value = new ArrayList<String>(); + for (String other_element_value_element : other_element_value) { + __this__success_copy_value.add(other_element_value_element); + } + + __this__success.put(__this__success_copy_key, __this__success_copy_value); + } + this.success = __this__success; + } + if (other.isSetIre()) { + this.ire = new InvalidRequestException(other.ire); + } + } + + public check_schema_agreement_result deepCopy() { + return new check_schema_agreement_result(this); + } + + @Deprecated + public check_schema_agreement_result clone() { + return new check_schema_agreement_result(this); + } + + public int getSuccessSize() { + return (this.success == null) ? 0 : this.success.size(); + } + + public void putToSuccess(String key, List<String> val) { + if (this.success == null) { + this.success = new HashMap<String,List<String>>(); + } + this.success.put(key, val); + } + + public Map<String,List<String>> getSuccess() { + return this.success; + } + + public check_schema_agreement_result setSuccess(Map<String,List<String>> success) { + this.success = success; + return this; + } + + public void unsetSuccess() { + this.success = null; + } + + /** Returns true if field success is set (has been asigned a value) and false otherwise */ + public boolean isSetSuccess() { + return this.success != null; + } + + public void setSuccessIsSet(boolean value) { + if (!value) { + this.success = null; + } + } + + public InvalidRequestException getIre() { + return this.ire; + } + + public check_schema_agreement_result setIre(InvalidRequestException ire) { + this.ire = ire; + return this; + } + + public void unsetIre() { + this.ire = null; + } + + /** Returns true if field ire is set (has been asigned a value) and false otherwise */ + public boolean isSetIre() { + return this.ire != null; + } + + public void setIreIsSet(boolean value) { + if (!value) { + this.ire = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case SUCCESS: + if (value == null) { + unsetSuccess(); + } else { + setSuccess((Map<String,List<String>>)value); + } + break; + + case IRE: + if (value == null) { + unsetIre(); + } else { + setIre((InvalidRequestException)value); + } + break; + + } + } + + public void setFieldValue(int fieldID, Object value) { + setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value); + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case SUCCESS: + return getSuccess(); + + case IRE: + return getIre(); + + } + throw new IllegalStateException(); + } + + public Object getFieldValue(int fieldId) { + return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId)); + } + + /** Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + switch (field) { + case SUCCESS: + return isSetSuccess(); + case IRE: + return isSetIre(); + } + throw new IllegalStateException(); + } + + public boolean isSet(int fieldID) { + return isSet(_Fields.findByThriftIdOrThrow(fieldID)); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof check_schema_agreement_result) + return this.equals((check_schema_agreement_result)that); + return false; + } + + public boolean equals(check_schema_agreement_result that) { + if (that == null) + return false; + + boolean this_present_success = true && this.isSetSuccess(); + boolean that_present_success = true && that.isSetSuccess(); + if (this_present_success || that_present_success) { + if (!(this_present_success && that_present_success)) + return false; + if (!this.success.equals(that.success)) + return false; + } + + boolean this_present_ire = true && this.isSetIre(); + boolean that_present_ire = true && that.isSetIre(); + if (this_present_ire || that_present_ire) { + if (!(this_present_ire && that_present_ire)) + return false; + if (!this.ire.equals(that.ire)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + public void read(TProtocol iprot) throws TException { + TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == TType.STOP) { + break; + } + switch (field.id) { + case 0: // SUCCESS + if (field.type == TType.MAP) { + { + TMap _map73 = iprot.readMapBegin(); + this.success = new HashMap<String,List<String>>(2*_map73.size); + for (int _i74 = 0; _i74 < _map73.size; ++_i74) + { + String _key75; + List<String> _val76; + _key75 = iprot.readString(); + { + TList _list77 = iprot.readListBegin(); + _val76 = new ArrayList<String>(_list77.size); + for (int _i78 = 0; _i78 < _list77.size; ++_i78) + { + String _elem79; + _elem79 = iprot.readString(); + _val76.add(_elem79); + } + iprot.readListEnd(); + } + this.success.put(_key75, _val76); + } + iprot.readMapEnd(); + } + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + case 1: // IRE + if (field.type == TType.STRUCT) { + this.ire = new InvalidRequestException(); + this.ire.read(iprot); + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + default: + TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + validate(); + } + + public void write(TProtocol oprot) throws TException { + oprot.writeStructBegin(STRUCT_DESC); + + if (this.isSetSuccess()) { + oprot.writeFieldBegin(SUCCESS_FIELD_DESC); + { + oprot.writeMapBegin(new TMap(TType.STRING, TType.LIST, this.success.size())); + for (Map.Entry<String, List<String>> _iter80 : this.success.entrySet()) + { + oprot.writeString(_iter80.getKey()); + { + oprot.writeListBegin(new TList(TType.STRING, _iter80.getValue().size())); + for (String _iter81 : _iter80.getValue()) + { + oprot.writeString(_iter81); + } + oprot.writeListEnd(); + } + } + oprot.writeMapEnd(); + } + oprot.writeFieldEnd(); + } else if (this.isSetIre()) { + oprot.writeFieldBegin(IRE_FIELD_DESC); + this.ire.write(oprot); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("check_schema_agreement_result("); + boolean first = true; + + sb.append("success:"); + if (this.success == null) { + sb.append("null"); + } else { + sb.append(this.success); + } + first = false; + if (!first) sb.append(", "); + sb.append("ire:"); + if (this.ire == null) { + sb.append("null"); + } else { + sb.append(this.ire); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws TException { + // check for required fields + } + + } + public static class describe_keyspaces_args implements TBase<describe_keyspaces_args._Fields>, java.io.Serializable, Cloneable, Comparable<describe_keyspaces_args> { private static final TStruct STRUCT_DESC = new TStruct("describe_keyspaces_args"); @@ -14420,13 +15093,13 @@ public class Cassandra { case 0: // SUCCESS if (field.type == TType.SET) { { - TSet _set73 = iprot.readSetBegin(); - this.success = new HashSet<String>(2*_set73.size); - for (int _i74 = 0; _i74 < _set73.size; ++_i74) + TSet _set82 = iprot.readSetBegin(); + this.success = new HashSet<String>(2*_set82.size); + for (int _i83 = 0; _i83 < _set82.size; ++_i83) { - String _elem75; - _elem75 = iprot.readString(); - this.success.add(_elem75); + String _elem84; + _elem84 = iprot.readString(); + this.success.add(_elem84); } iprot.readSetEnd(); } @@ -14452,9 +15125,9 @@ public class Cassandra { oprot.writeFieldBegin(SUCCESS_FIELD_DESC); { oprot.writeSetBegin(new TSet(TType.STRING, this.success.size())); - for (String _iter76 : this.success) + for (String _iter85 : this.success) { - oprot.writeString(_iter76); + oprot.writeString(_iter85); } oprot.writeSetEnd(); } @@ -15961,14 +16634,14 @@ public class Cassandra { case 0: // SUCCESS if (field.type == TType.LIST) { { - TList _list77 = iprot.readListBegin(); - this.success = new ArrayList<TokenRange>(_list77.size); - for (int _i78 = 0; _i78 < _list77.size; ++_i78) + TList _list86 = iprot.readListBegin(); + this.success = new ArrayList<TokenRange>(_list86.size); + for (int _i87 = 0; _i87 < _list86.size; ++_i87) { - TokenRange _elem79; - _elem79 = new TokenRange(); - _elem79.read(iprot); - this.success.add(_elem79); + TokenRange _elem88; + _elem88 = new TokenRange(); + _elem88.read(iprot); + this.success.add(_elem88); } iprot.readListEnd(); } @@ -15994,9 +16667,9 @@ public class Cassandra { oprot.writeFieldBegin(SUCCESS_FIELD_DESC); { oprot.writeListBegin(new TList(TType.STRUCT, this.success.size())); - for (TokenRange _iter80 : this.success) + for (TokenRange _iter89 : this.success) { - _iter80.write(oprot); + _iter89.write(oprot); } oprot.writeListEnd(); } @@ -16617,27 +17290,27 @@ public class Cassandra { case 0: // SUCCESS if (field.type == TType.MAP) { { - TMap _map81 = iprot.readMapBegin(); - this.success = new HashMap<String,Map<String,String>>(2*_map81.size); - for (int _i82 = 0; _i82 < _map81.size; ++_i82) + TMap _map90 = iprot.readMapBegin(); + this.success = new HashMap<String,Map<String,String>>(2*_map90.size); + for (int _i91 = 0; _i91 < _map90.size; ++_i91) { - String _key83; - Map<String,String> _val84; - _key83 = iprot.readString(); + String _key92; + Map<String,String> _val93; + _key92 = iprot.readString(); { - TMap _map85 = iprot.readMapBegin(); - _val84 = new HashMap<String,String>(2*_map85.size); - for (int _i86 = 0; _i86 < _map85.size; ++_i86) + TMap _map94 = iprot.readMapBegin(); + _val93 = new HashMap<String,String>(2*_map94.size); + for (int _i95 = 0; _i95 < _map94.size; ++_i95) { - String _key87; - String _val88; - _key87 = iprot.readString(); - _val88 = iprot.readString(); - _val84.put(_key87, _val88); + String _key96; + String _val97; + _key96 = iprot.readString(); + _val97 = iprot.readString(); + _val93.put(_key96, _val97); } iprot.readMapEnd(); } - this.success.put(_key83, _val84); + this.success.put(_key92, _val93); } iprot.readMapEnd(); } @@ -16671,15 +17344,15 @@ public class Cassandra { oprot.writeFieldBegin(SUCCESS_FIELD_DESC); { oprot.writeMapBegin(new TMap(TType.STRING, TType.MAP, this.success.size())); - for (Map.Entry<String, Map<String,String>> _iter89 : this.success.entrySet()) + for (Map.Entry<String, Map<String,String>> _iter98 : this.success.entrySet()) { - oprot.writeString(_iter89.getKey()); + oprot.writeString(_iter98.getKey()); { - oprot.writeMapBegin(new TMap(TType.STRING, TType.STRING, _iter89.getValue().size())); - for (Map.Entry<String, String> _iter90 : _iter89.getValue().entrySet()) + oprot.writeMapBegin(new TMap(TType.STRING, TType.STRING, _iter98.getValue().size())); + for (Map.Entry<String, String> _iter99 : _iter98.getValue().entrySet()) { - oprot.writeString(_iter90.getKey()); - oprot.writeString(_iter90.getValue()); + oprot.writeString(_iter99.getKey()); + oprot.writeString(_iter99.getValue()); } oprot.writeMapEnd(); } @@ -17435,13 +18108,13 @@ public class Cassandra { case 0: // SUCCESS if (field.type == TType.LIST) { { - TList _list91 = iprot.readListBegin(); - this.success = new ArrayList<String>(_list91.size); - for (int _i92 = 0; _i92 < _list91.size; ++_i92) + TList _list100 = iprot.readListBegin(); + this.success = new ArrayList<String>(_list100.size); + for (int _i101 = 0; _i101 < _list100.size; ++_i101) { - String _elem93; - _elem93 = iprot.readString(); - this.success.add(_elem93); + String _elem102; + _elem102 = iprot.readString(); + this.success.add(_elem102); } iprot.readListEnd(); } @@ -17467,9 +18140,9 @@ public class Cassandra { oprot.writeFieldBegin(SUCCESS_FIELD_DESC); { oprot.writeListBegin(new TList(TType.STRING, this.success.size())); - for (String _iter94 : this.success) + for (String _iter103 : this.success) { - oprot.writeString(_iter94); + oprot.writeString(_iter103); } oprot.writeListEnd(); } Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=946717&r1=946716&r2=946717&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Thu May 20 17:13:17 2010 @@ -51,6 +51,8 @@ import org.apache.cassandra.service.Stor import static org.apache.cassandra.utils.FBUtilities.UTF8; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.thrift.*; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.cassandra.avro.AvroRecordFactory.*; @@ -573,4 +575,10 @@ public class CassandraServer implements { return API_VERSION; } + + public Map<String, List<String>> check_schema_agreement() + { + logger.debug("checking schema agreement"); + return StorageProxy.checkSchemaAgreement(); + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=946717&r1=946716&r2=946717&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu May 20 17:13:17 2010 @@ -84,7 +84,7 @@ public class DatabaseDescriptor private final static String STORAGE_CONF_FILE = "cassandra.yaml"; - private static final UUID INITIAL_VERSION = new UUID(4096, 0); // has type nibble set to 1, everything else to zero. + public static final UUID INITIAL_VERSION = new UUID(4096, 0); // has type nibble set to 1, everything else to zero. private static UUID defsVersion = INITIAL_VERSION; /** Added: cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java?rev=946717&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java Thu May 20 17:13:17 2010 @@ -0,0 +1,41 @@ +package org.apache.cassandra.db; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.FBUtilities; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +public class SchemaCheckVerbHandler implements IVerbHandler +{ + private final Logger logger = LoggerFactory.getLogger(SchemaCheckVerbHandler.class); + + @Override + public void doVerb(Message message) + { + logger.debug("Received schema check request."); + Message response = message.getReply(FBUtilities.getLocalAddress(), DatabaseDescriptor.getDefsVersion().toString().getBytes()); + MessagingService.instance.sendOneWay(response, message.getFrom()); + } +} Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=946717&r1=946716&r2=946717&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu May 20 17:13:17 2010 @@ -26,12 +26,17 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -55,6 +60,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.net.IAsyncCallback; import org.apache.cassandra.net.IAsyncResult; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; @@ -565,6 +571,76 @@ public class StorageProxy implements Sto } /** + * initiate a request/response session with each live node to check whether or not everybody is using the same + * migration id. This is useful for determining if a schema change has propagated through the cluster. Disagreement + * is assumed if any node fails to respond. + */ + public static Map<String, List<String>> checkSchemaAgreement() + { + final Map<String, List<String>> results = new HashMap<String, List<String>>(); + + final String myVersion = DatabaseDescriptor.getDefsVersion().toString(); + final Map<InetAddress, UUID> versions = new ConcurrentHashMap<InetAddress, UUID>(); + final Set<InetAddress> liveHosts = Gossiper.instance.getLiveMembers(); + final Message msg = new Message(FBUtilities.getLocalAddress(), StageManager.MIGRATION_STAGE, StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY); + final CountDownLatch latch = new CountDownLatch(liveHosts.size()); + // an empty message acts as a request to the SchemaCheckVerbHandler. + MessagingService.instance.sendRR(msg, liveHosts.toArray(new InetAddress[]{}), new IAsyncCallback() + { + @Override + public void response(Message msg) + { + // record the response from the remote node. + logger.debug("Received schema check response from " + msg.getFrom().getHostAddress()); + UUID theirVersion = UUID.fromString(new String(msg.getMessageBody())); + versions.put(msg.getFrom(), theirVersion); + latch.countDown(); + } + }); + + try + { + // wait for as long as possible. timeout-1s if possible. + latch.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); + } + catch (InterruptedException ex) + { + throw new AssertionError("This latch shouldn't have been interrupted."); + } + + logger.debug("My version is " + myVersion); + + // first, indicate any hosts that did not respond. + final Set<InetAddress> ackedHosts = versions.keySet(); + if (ackedHosts.size() < liveHosts.size()) + { + Set<InetAddress> missingHosts = new HashSet<InetAddress>(liveHosts); + missingHosts.removeAll(ackedHosts); + assert missingHosts.size() > 0; + List<String> missingHostNames = new ArrayList<String>(missingHosts.size()); + for (InetAddress host : missingHosts) + missingHostNames.add(host.getHostAddress()); + results.put(DatabaseDescriptor.INITIAL_VERSION.toString(), missingHostNames); + logger.debug("Hosts not in agreement. Didn't get a response from everybody: " + StringUtils.join(missingHostNames, ",")); + } + + // check for version disagreement. log the hosts that don't agree. + for (InetAddress host : ackedHosts) + { + String uuid = versions.get(host).toString(); + if (!results.containsKey(uuid)) + results.put(uuid, new ArrayList<String>()); + results.get(uuid).add(host.getHostAddress()); + if (!uuid.equals(myVersion)) + logger.debug("%s disagrees (%s)", host.getHostAddress(), uuid); + } + if (results.size() == 1) + logger.debug("Schemas are in agreement."); + + return results; + } + + /** * returns an iterator that will return ranges in ring order, starting with the one that contains the start token */ private static Iterable<Pair<AbstractBounds, List<InetAddress>>> getRangeIterator(final List<Pair<AbstractBounds, List<InetAddress>>> ranges, Token start) Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=946717&r1=946716&r2=946717&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu May 20 17:13:17 2010 @@ -111,6 +111,7 @@ public class StorageService implements I DEFINITIONS_ANNOUNCE, DEFINITIONS_UPDATE_RESPONSE, TRUNCATE, + SCHEMA_CHECK, ; // remember to add new verbs at the end, since we serialize by ordinal } @@ -236,6 +237,7 @@ public class StorageService implements I MessagingService.instance.registerVerbHandlers(Verb.DEFINITIONS_ANNOUNCE, new DefinitionsAnnounceVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.DEFINITIONS_UPDATE_RESPONSE, new DefinitionsUpdateResponseVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.TRUNCATE, new TruncateVerbHandler()); + MessagingService.instance.registerVerbHandlers(Verb.SCHEMA_CHECK, new SchemaCheckVerbHandler()); replicationStrategies = new HashMap<String, AbstractReplicationStrategy>(); for (String table : DatabaseDescriptor.getNonSystemTables()) Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=946717&r1=946716&r2=946717&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Thu May 20 17:13:17 2010 @@ -856,5 +856,12 @@ public class CassandraServer implements keySpace.set(keyspace); } + @Override + public Map<String, List<String>> check_schema_agreement() throws TException, InvalidRequestException + { + logger.debug("checking schema agreement"); + return StorageProxy.checkSchemaAgreement(); + } + // main method moved to CassandraDaemon }