Author: gdusbabek
Date: Thu May 20 17:13:17 2010
New Revision: 946717

URL: http://svn.apache.org/viewvc?rev=946717&view=rev
Log:
service call to check for schema agreement. patch by gdusbabek, reviewed by 
jbellis. CASSANDRA-1075

Added:
    cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
Modified:
    cassandra/trunk/interface/cassandra.thrift
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java

Modified: cassandra/trunk/interface/cassandra.thrift
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.thrift?rev=946717&r1=946716&r2=946717&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.thrift (original)
+++ cassandra/trunk/interface/cassandra.thrift Thu May 20 17:13:17 2010
@@ -435,6 +435,14 @@ service Cassandra {
     
   // Meta-APIs -- APIs to get information about the node or cluster,
   // rather than user data.  The nodeprobe program provides usage examples.
+  
+  /** 
+   * ask the cluster if they all are using the same migration id. returns a 
map of version->hosts-on-that-version.
+   * hosts that did not respond will be under the key 
DatabaseDescriptor.INITIAL_VERSION. agreement can be determined
+   * by checking if the size of the map is 1. 
+   */
+  map<string, list<string>> check_schema_agreement()
+       throws (1: InvalidRequestException ire),
 
   /** list the defined keyspaces in this cluster */
   set<string> describe_keyspaces(),

Modified: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java?rev=946717&r1=946716&r2=946717&view=diff
==============================================================================
--- 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
 (original)
+++ 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
 Thu May 20 17:13:17 2010
@@ -162,6 +162,13 @@ public class Cassandra {
     public void truncate(String keyspace, String cfname) throws 
InvalidRequestException, UnavailableException, TException;
 
     /**
+     * ask the cluster if they all are using the same migration id. returns a 
map of version->hosts-on-that-version.
+     * hosts that did not respond will be under the key 
DatabaseDescriptor.INITIAL_VERSION. agreement can be determined
+     * by checking if the size of the map is 1.
+     */
+    public Map<String,List<String>> check_schema_agreement() throws 
InvalidRequestException, TException;
+
+    /**
      * list the defined keyspaces in this cluster
      */
     public Set<String> describe_keyspaces() throws TException;
@@ -757,6 +764,41 @@ public class Cassandra {
       return;
     }
 
+    public Map<String,List<String>> check_schema_agreement() throws 
InvalidRequestException, TException
+    {
+      send_check_schema_agreement();
+      return recv_check_schema_agreement();
+    }
+
+    public void send_check_schema_agreement() throws TException
+    {
+      oprot_.writeMessageBegin(new TMessage("check_schema_agreement", 
TMessageType.CALL, seqid_));
+      check_schema_agreement_args args = new check_schema_agreement_args();
+      args.write(oprot_);
+      oprot_.writeMessageEnd();
+      oprot_.getTransport().flush();
+    }
+
+    public Map<String,List<String>> recv_check_schema_agreement() throws 
InvalidRequestException, TException
+    {
+      TMessage msg = iprot_.readMessageBegin();
+      if (msg.type == TMessageType.EXCEPTION) {
+        TApplicationException x = TApplicationException.read(iprot_);
+        iprot_.readMessageEnd();
+        throw x;
+      }
+      check_schema_agreement_result result = new 
check_schema_agreement_result();
+      result.read(iprot_);
+      iprot_.readMessageEnd();
+      if (result.isSetSuccess()) {
+        return result.success;
+      }
+      if (result.ire != null) {
+        throw result.ire;
+      }
+      throw new TApplicationException(TApplicationException.MISSING_RESULT, 
"check_schema_agreement failed: unknown result");
+    }
+
     public Set<String> describe_keyspaces() throws TException
     {
       send_describe_keyspaces();
@@ -1177,6 +1219,7 @@ public class Cassandra {
       processMap_.put("remove", new remove());
       processMap_.put("batch_mutate", new batch_mutate());
       processMap_.put("truncate", new truncate());
+      processMap_.put("check_schema_agreement", new check_schema_agreement());
       processMap_.put("describe_keyspaces", new describe_keyspaces());
       processMap_.put("describe_cluster_name", new describe_cluster_name());
       processMap_.put("describe_version", new describe_version());
@@ -1715,6 +1758,44 @@ public class Cassandra {
 
     }
 
+    private class check_schema_agreement implements ProcessFunction {
+      public void process(int seqid, TProtocol iprot, TProtocol oprot) throws 
TException
+      {
+        check_schema_agreement_args args = new check_schema_agreement_args();
+        try {
+          args.read(iprot);
+        } catch (TProtocolException e) {
+          iprot.readMessageEnd();
+          TApplicationException x = new 
TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
+          oprot.writeMessageBegin(new TMessage("check_schema_agreement", 
TMessageType.EXCEPTION, seqid));
+          x.write(oprot);
+          oprot.writeMessageEnd();
+          oprot.getTransport().flush();
+          return;
+        }
+        iprot.readMessageEnd();
+        check_schema_agreement_result result = new 
check_schema_agreement_result();
+        try {
+          result.success = iface_.check_schema_agreement();
+        } catch (InvalidRequestException ire) {
+          result.ire = ire;
+        } catch (Throwable th) {
+          LOGGER.error("Internal error processing check_schema_agreement", th);
+          TApplicationException x = new 
TApplicationException(TApplicationException.INTERNAL_ERROR, "Internal error 
processing check_schema_agreement");
+          oprot.writeMessageBegin(new TMessage("check_schema_agreement", 
TMessageType.EXCEPTION, seqid));
+          x.write(oprot);
+          oprot.writeMessageEnd();
+          oprot.getTransport().flush();
+          return;
+        }
+        oprot.writeMessageBegin(new TMessage("check_schema_agreement", 
TMessageType.REPLY, seqid));
+        result.write(oprot);
+        oprot.writeMessageEnd();
+        oprot.getTransport().flush();
+      }
+
+    }
+
     private class describe_keyspaces implements ProcessFunction {
       public void process(int seqid, TProtocol iprot, TProtocol oprot) throws 
TException
       {
@@ -13999,6 +14080,598 @@ public class Cassandra {
 
   }
 
+  public static class check_schema_agreement_args implements 
TBase<check_schema_agreement_args._Fields>, java.io.Serializable, Cloneable, 
Comparable<check_schema_agreement_args>   {
+    private static final TStruct STRUCT_DESC = new 
TStruct("check_schema_agreement_args");
+
+
+
+    /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
+    public enum _Fields implements TFieldIdEnum {
+;
+
+      private static final Map<Integer, _Fields> byId = new HashMap<Integer, 
_Fields>();
+      private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byId.put((int)field._thriftId, field);
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not 
found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        return byId.get(fieldId);
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + 
fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+    public static final Map<_Fields, FieldMetaData> metaDataMap = 
Collections.unmodifiableMap(new EnumMap<_Fields, FieldMetaData>(_Fields.class) 
{{
+    }});
+
+    static {
+      FieldMetaData.addStructMetaDataMap(check_schema_agreement_args.class, 
metaDataMap);
+    }
+
+    public check_schema_agreement_args() {
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public check_schema_agreement_args(check_schema_agreement_args other) {
+    }
+
+    public check_schema_agreement_args deepCopy() {
+      return new check_schema_agreement_args(this);
+    }
+
+    @Deprecated
+    public check_schema_agreement_args clone() {
+      return new check_schema_agreement_args(this);
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      }
+    }
+
+    public void setFieldValue(int fieldID, Object value) {
+      setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    public Object getFieldValue(int fieldId) {
+      return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been 
asigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    public boolean isSet(int fieldID) {
+      return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof check_schema_agreement_args)
+        return this.equals((check_schema_agreement_args)that);
+      return false;
+    }
+
+    public boolean equals(check_schema_agreement_args that) {
+      if (that == null)
+        return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(check_schema_agreement_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      check_schema_agreement_args typedOther = 
(check_schema_agreement_args)other;
+
+      return 0;
+    }
+
+    public void read(TProtocol iprot) throws TException {
+      TField field;
+      iprot.readStructBegin();
+      while (true)
+      {
+        field = iprot.readFieldBegin();
+        if (field.type == TType.STOP) { 
+          break;
+        }
+        switch (field.id) {
+          default:
+            TProtocolUtil.skip(iprot, field.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked 
in the validate method
+      validate();
+    }
+
+    public void write(TProtocol oprot) throws TException {
+      validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("check_schema_agreement_args(");
+      boolean first = true;
+
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws TException {
+      // check for required fields
+    }
+
+  }
+
+  public static class check_schema_agreement_result implements 
TBase<check_schema_agreement_result._Fields>, java.io.Serializable, Cloneable   
{
+    private static final TStruct STRUCT_DESC = new 
TStruct("check_schema_agreement_result");
+
+    private static final TField SUCCESS_FIELD_DESC = new TField("success", 
TType.MAP, (short)0);
+    private static final TField IRE_FIELD_DESC = new TField("ire", 
TType.STRUCT, (short)1);
+
+    public Map<String,List<String>> success;
+    public InvalidRequestException ire;
+
+    /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
+    public enum _Fields implements TFieldIdEnum {
+      SUCCESS((short)0, "success"),
+      IRE((short)1, "ire");
+
+      private static final Map<Integer, _Fields> byId = new HashMap<Integer, 
_Fields>();
+      private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byId.put((int)field._thriftId, field);
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not 
found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        return byId.get(fieldId);
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + 
fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+
+    public static final Map<_Fields, FieldMetaData> metaDataMap = 
Collections.unmodifiableMap(new EnumMap<_Fields, FieldMetaData>(_Fields.class) 
{{
+      put(_Fields.SUCCESS, new FieldMetaData("success", 
TFieldRequirementType.DEFAULT, 
+          new MapMetaData(TType.MAP, 
+              new FieldValueMetaData(TType.STRING), 
+              new ListMetaData(TType.LIST, 
+                  new FieldValueMetaData(TType.STRING)))));
+      put(_Fields.IRE, new FieldMetaData("ire", TFieldRequirementType.DEFAULT, 
+          new FieldValueMetaData(TType.STRUCT)));
+    }});
+
+    static {
+      FieldMetaData.addStructMetaDataMap(check_schema_agreement_result.class, 
metaDataMap);
+    }
+
+    public check_schema_agreement_result() {
+    }
+
+    public check_schema_agreement_result(
+      Map<String,List<String>> success,
+      InvalidRequestException ire)
+    {
+      this();
+      this.success = success;
+      this.ire = ire;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public check_schema_agreement_result(check_schema_agreement_result other) {
+      if (other.isSetSuccess()) {
+        Map<String,List<String>> __this__success = new 
HashMap<String,List<String>>();
+        for (Map.Entry<String, List<String>> other_element : 
other.success.entrySet()) {
+
+          String other_element_key = other_element.getKey();
+          List<String> other_element_value = other_element.getValue();
+
+          String __this__success_copy_key = other_element_key;
+
+          List<String> __this__success_copy_value = new ArrayList<String>();
+          for (String other_element_value_element : other_element_value) {
+            __this__success_copy_value.add(other_element_value_element);
+          }
+
+          __this__success.put(__this__success_copy_key, 
__this__success_copy_value);
+        }
+        this.success = __this__success;
+      }
+      if (other.isSetIre()) {
+        this.ire = new InvalidRequestException(other.ire);
+      }
+    }
+
+    public check_schema_agreement_result deepCopy() {
+      return new check_schema_agreement_result(this);
+    }
+
+    @Deprecated
+    public check_schema_agreement_result clone() {
+      return new check_schema_agreement_result(this);
+    }
+
+    public int getSuccessSize() {
+      return (this.success == null) ? 0 : this.success.size();
+    }
+
+    public void putToSuccess(String key, List<String> val) {
+      if (this.success == null) {
+        this.success = new HashMap<String,List<String>>();
+      }
+      this.success.put(key, val);
+    }
+
+    public Map<String,List<String>> getSuccess() {
+      return this.success;
+    }
+
+    public check_schema_agreement_result setSuccess(Map<String,List<String>> 
success) {
+      this.success = success;
+      return this;
+    }
+
+    public void unsetSuccess() {
+      this.success = null;
+    }
+
+    /** Returns true if field success is set (has been asigned a value) and 
false otherwise */
+    public boolean isSetSuccess() {
+      return this.success != null;
+    }
+
+    public void setSuccessIsSet(boolean value) {
+      if (!value) {
+        this.success = null;
+      }
+    }
+
+    public InvalidRequestException getIre() {
+      return this.ire;
+    }
+
+    public check_schema_agreement_result setIre(InvalidRequestException ire) {
+      this.ire = ire;
+      return this;
+    }
+
+    public void unsetIre() {
+      this.ire = null;
+    }
+
+    /** Returns true if field ire is set (has been asigned a value) and false 
otherwise */
+    public boolean isSetIre() {
+      return this.ire != null;
+    }
+
+    public void setIreIsSet(boolean value) {
+      if (!value) {
+        this.ire = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case SUCCESS:
+        if (value == null) {
+          unsetSuccess();
+        } else {
+          setSuccess((Map<String,List<String>>)value);
+        }
+        break;
+
+      case IRE:
+        if (value == null) {
+          unsetIre();
+        } else {
+          setIre((InvalidRequestException)value);
+        }
+        break;
+
+      }
+    }
+
+    public void setFieldValue(int fieldID, Object value) {
+      setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case SUCCESS:
+        return getSuccess();
+
+      case IRE:
+        return getIre();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    public Object getFieldValue(int fieldId) {
+      return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been 
asigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      switch (field) {
+      case SUCCESS:
+        return isSetSuccess();
+      case IRE:
+        return isSetIre();
+      }
+      throw new IllegalStateException();
+    }
+
+    public boolean isSet(int fieldID) {
+      return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof check_schema_agreement_result)
+        return this.equals((check_schema_agreement_result)that);
+      return false;
+    }
+
+    public boolean equals(check_schema_agreement_result that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_success = true && this.isSetSuccess();
+      boolean that_present_success = true && that.isSetSuccess();
+      if (this_present_success || that_present_success) {
+        if (!(this_present_success && that_present_success))
+          return false;
+        if (!this.success.equals(that.success))
+          return false;
+      }
+
+      boolean this_present_ire = true && this.isSetIre();
+      boolean that_present_ire = true && that.isSetIre();
+      if (this_present_ire || that_present_ire) {
+        if (!(this_present_ire && that_present_ire))
+          return false;
+        if (!this.ire.equals(that.ire))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public void read(TProtocol iprot) throws TException {
+      TField field;
+      iprot.readStructBegin();
+      while (true)
+      {
+        field = iprot.readFieldBegin();
+        if (field.type == TType.STOP) { 
+          break;
+        }
+        switch (field.id) {
+          case 0: // SUCCESS
+            if (field.type == TType.MAP) {
+              {
+                TMap _map73 = iprot.readMapBegin();
+                this.success = new HashMap<String,List<String>>(2*_map73.size);
+                for (int _i74 = 0; _i74 < _map73.size; ++_i74)
+                {
+                  String _key75;
+                  List<String> _val76;
+                  _key75 = iprot.readString();
+                  {
+                    TList _list77 = iprot.readListBegin();
+                    _val76 = new ArrayList<String>(_list77.size);
+                    for (int _i78 = 0; _i78 < _list77.size; ++_i78)
+                    {
+                      String _elem79;
+                      _elem79 = iprot.readString();
+                      _val76.add(_elem79);
+                    }
+                    iprot.readListEnd();
+                  }
+                  this.success.put(_key75, _val76);
+                }
+                iprot.readMapEnd();
+              }
+            } else { 
+              TProtocolUtil.skip(iprot, field.type);
+            }
+            break;
+          case 1: // IRE
+            if (field.type == TType.STRUCT) {
+              this.ire = new InvalidRequestException();
+              this.ire.read(iprot);
+            } else { 
+              TProtocolUtil.skip(iprot, field.type);
+            }
+            break;
+          default:
+            TProtocolUtil.skip(iprot, field.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked 
in the validate method
+      validate();
+    }
+
+    public void write(TProtocol oprot) throws TException {
+      oprot.writeStructBegin(STRUCT_DESC);
+
+      if (this.isSetSuccess()) {
+        oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+        {
+          oprot.writeMapBegin(new TMap(TType.STRING, TType.LIST, 
this.success.size()));
+          for (Map.Entry<String, List<String>> _iter80 : 
this.success.entrySet())
+          {
+            oprot.writeString(_iter80.getKey());
+            {
+              oprot.writeListBegin(new TList(TType.STRING, 
_iter80.getValue().size()));
+              for (String _iter81 : _iter80.getValue())
+              {
+                oprot.writeString(_iter81);
+              }
+              oprot.writeListEnd();
+            }
+          }
+          oprot.writeMapEnd();
+        }
+        oprot.writeFieldEnd();
+      } else if (this.isSetIre()) {
+        oprot.writeFieldBegin(IRE_FIELD_DESC);
+        this.ire.write(oprot);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("check_schema_agreement_result(");
+      boolean first = true;
+
+      sb.append("success:");
+      if (this.success == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.success);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("ire:");
+      if (this.ire == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.ire);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws TException {
+      // check for required fields
+    }
+
+  }
+
   public static class describe_keyspaces_args implements 
TBase<describe_keyspaces_args._Fields>, java.io.Serializable, Cloneable, 
Comparable<describe_keyspaces_args>   {
     private static final TStruct STRUCT_DESC = new 
TStruct("describe_keyspaces_args");
 
@@ -14420,13 +15093,13 @@ public class Cassandra {
           case 0: // SUCCESS
             if (field.type == TType.SET) {
               {
-                TSet _set73 = iprot.readSetBegin();
-                this.success = new HashSet<String>(2*_set73.size);
-                for (int _i74 = 0; _i74 < _set73.size; ++_i74)
+                TSet _set82 = iprot.readSetBegin();
+                this.success = new HashSet<String>(2*_set82.size);
+                for (int _i83 = 0; _i83 < _set82.size; ++_i83)
                 {
-                  String _elem75;
-                  _elem75 = iprot.readString();
-                  this.success.add(_elem75);
+                  String _elem84;
+                  _elem84 = iprot.readString();
+                  this.success.add(_elem84);
                 }
                 iprot.readSetEnd();
               }
@@ -14452,9 +15125,9 @@ public class Cassandra {
         oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
         {
           oprot.writeSetBegin(new TSet(TType.STRING, this.success.size()));
-          for (String _iter76 : this.success)
+          for (String _iter85 : this.success)
           {
-            oprot.writeString(_iter76);
+            oprot.writeString(_iter85);
           }
           oprot.writeSetEnd();
         }
@@ -15961,14 +16634,14 @@ public class Cassandra {
           case 0: // SUCCESS
             if (field.type == TType.LIST) {
               {
-                TList _list77 = iprot.readListBegin();
-                this.success = new ArrayList<TokenRange>(_list77.size);
-                for (int _i78 = 0; _i78 < _list77.size; ++_i78)
+                TList _list86 = iprot.readListBegin();
+                this.success = new ArrayList<TokenRange>(_list86.size);
+                for (int _i87 = 0; _i87 < _list86.size; ++_i87)
                 {
-                  TokenRange _elem79;
-                  _elem79 = new TokenRange();
-                  _elem79.read(iprot);
-                  this.success.add(_elem79);
+                  TokenRange _elem88;
+                  _elem88 = new TokenRange();
+                  _elem88.read(iprot);
+                  this.success.add(_elem88);
                 }
                 iprot.readListEnd();
               }
@@ -15994,9 +16667,9 @@ public class Cassandra {
         oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
         {
           oprot.writeListBegin(new TList(TType.STRUCT, this.success.size()));
-          for (TokenRange _iter80 : this.success)
+          for (TokenRange _iter89 : this.success)
           {
-            _iter80.write(oprot);
+            _iter89.write(oprot);
           }
           oprot.writeListEnd();
         }
@@ -16617,27 +17290,27 @@ public class Cassandra {
           case 0: // SUCCESS
             if (field.type == TType.MAP) {
               {
-                TMap _map81 = iprot.readMapBegin();
-                this.success = new 
HashMap<String,Map<String,String>>(2*_map81.size);
-                for (int _i82 = 0; _i82 < _map81.size; ++_i82)
+                TMap _map90 = iprot.readMapBegin();
+                this.success = new 
HashMap<String,Map<String,String>>(2*_map90.size);
+                for (int _i91 = 0; _i91 < _map90.size; ++_i91)
                 {
-                  String _key83;
-                  Map<String,String> _val84;
-                  _key83 = iprot.readString();
+                  String _key92;
+                  Map<String,String> _val93;
+                  _key92 = iprot.readString();
                   {
-                    TMap _map85 = iprot.readMapBegin();
-                    _val84 = new HashMap<String,String>(2*_map85.size);
-                    for (int _i86 = 0; _i86 < _map85.size; ++_i86)
+                    TMap _map94 = iprot.readMapBegin();
+                    _val93 = new HashMap<String,String>(2*_map94.size);
+                    for (int _i95 = 0; _i95 < _map94.size; ++_i95)
                     {
-                      String _key87;
-                      String _val88;
-                      _key87 = iprot.readString();
-                      _val88 = iprot.readString();
-                      _val84.put(_key87, _val88);
+                      String _key96;
+                      String _val97;
+                      _key96 = iprot.readString();
+                      _val97 = iprot.readString();
+                      _val93.put(_key96, _val97);
                     }
                     iprot.readMapEnd();
                   }
-                  this.success.put(_key83, _val84);
+                  this.success.put(_key92, _val93);
                 }
                 iprot.readMapEnd();
               }
@@ -16671,15 +17344,15 @@ public class Cassandra {
         oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
         {
           oprot.writeMapBegin(new TMap(TType.STRING, TType.MAP, 
this.success.size()));
-          for (Map.Entry<String, Map<String,String>> _iter89 : 
this.success.entrySet())
+          for (Map.Entry<String, Map<String,String>> _iter98 : 
this.success.entrySet())
           {
-            oprot.writeString(_iter89.getKey());
+            oprot.writeString(_iter98.getKey());
             {
-              oprot.writeMapBegin(new TMap(TType.STRING, TType.STRING, 
_iter89.getValue().size()));
-              for (Map.Entry<String, String> _iter90 : 
_iter89.getValue().entrySet())
+              oprot.writeMapBegin(new TMap(TType.STRING, TType.STRING, 
_iter98.getValue().size()));
+              for (Map.Entry<String, String> _iter99 : 
_iter98.getValue().entrySet())
               {
-                oprot.writeString(_iter90.getKey());
-                oprot.writeString(_iter90.getValue());
+                oprot.writeString(_iter99.getKey());
+                oprot.writeString(_iter99.getValue());
               }
               oprot.writeMapEnd();
             }
@@ -17435,13 +18108,13 @@ public class Cassandra {
           case 0: // SUCCESS
             if (field.type == TType.LIST) {
               {
-                TList _list91 = iprot.readListBegin();
-                this.success = new ArrayList<String>(_list91.size);
-                for (int _i92 = 0; _i92 < _list91.size; ++_i92)
+                TList _list100 = iprot.readListBegin();
+                this.success = new ArrayList<String>(_list100.size);
+                for (int _i101 = 0; _i101 < _list100.size; ++_i101)
                 {
-                  String _elem93;
-                  _elem93 = iprot.readString();
-                  this.success.add(_elem93);
+                  String _elem102;
+                  _elem102 = iprot.readString();
+                  this.success.add(_elem102);
                 }
                 iprot.readListEnd();
               }
@@ -17467,9 +18140,9 @@ public class Cassandra {
         oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
         {
           oprot.writeListBegin(new TList(TType.STRING, this.success.size()));
-          for (String _iter94 : this.success)
+          for (String _iter103 : this.success)
           {
-            oprot.writeString(_iter94);
+            oprot.writeString(_iter103);
           }
           oprot.writeListEnd();
         }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=946717&r1=946716&r2=946717&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Thu 
May 20 17:13:17 2010
@@ -51,6 +51,8 @@ import org.apache.cassandra.service.Stor
 import static org.apache.cassandra.utils.FBUtilities.UTF8;
 
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.*;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import static org.apache.cassandra.avro.AvroRecordFactory.*;
@@ -573,4 +575,10 @@ public class CassandraServer implements 
     {
         return API_VERSION;
     }
+    
+    public Map<String, List<String>> check_schema_agreement()
+    {
+        logger.debug("checking schema agreement");      
+        return StorageProxy.checkSchemaAgreement();
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=946717&r1=946716&r2=946717&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
Thu May 20 17:13:17 2010
@@ -84,7 +84,7 @@ public class DatabaseDescriptor
 
     private final static String STORAGE_CONF_FILE = "cassandra.yaml";
 
-    private static final UUID INITIAL_VERSION = new UUID(4096, 0); // has type 
nibble set to 1, everything else to zero.
+    public static final UUID INITIAL_VERSION = new UUID(4096, 0); // has type 
nibble set to 1, everything else to zero.
     private static UUID defsVersion = INITIAL_VERSION;
 
     /**

Added: 
cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java?rev=946717&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java 
(added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java 
Thu May 20 17:13:17 2010
@@ -0,0 +1,41 @@
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+public class SchemaCheckVerbHandler implements IVerbHandler
+{
+    private final Logger logger = 
LoggerFactory.getLogger(SchemaCheckVerbHandler.class);
+    
+    @Override
+    public void doVerb(Message message)
+    {
+        logger.debug("Received schema check request.");
+        Message response = message.getReply(FBUtilities.getLocalAddress(), 
DatabaseDescriptor.getDefsVersion().toString().getBytes());
+        MessagingService.instance.sendOneWay(response, message.getFrom());
+    }
+}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=946717&r1=946716&r2=946717&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu 
May 20 17:13:17 2010
@@ -26,12 +26,17 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -55,6 +60,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -565,6 +571,76 @@ public class StorageProxy implements Sto
     }
 
     /**
+     * initiate a request/response session with each live node to check 
whether or not everybody is using the same 
+     * migration id. This is useful for determining if a schema change has 
propagated through the cluster. Disagreement
+     * is assumed if any node fails to respond.
+     */
+    public static Map<String, List<String>> checkSchemaAgreement()
+    {
+        final Map<String, List<String>> results = new HashMap<String, 
List<String>>();
+        
+        final String myVersion = 
DatabaseDescriptor.getDefsVersion().toString();
+        final Map<InetAddress, UUID> versions = new 
ConcurrentHashMap<InetAddress, UUID>();
+        final Set<InetAddress> liveHosts = Gossiper.instance.getLiveMembers();
+        final Message msg = new Message(FBUtilities.getLocalAddress(), 
StageManager.MIGRATION_STAGE, StorageService.Verb.SCHEMA_CHECK, 
ArrayUtils.EMPTY_BYTE_ARRAY);
+        final CountDownLatch latch = new CountDownLatch(liveHosts.size());
+        // an empty message acts as a request to the SchemaCheckVerbHandler.
+        MessagingService.instance.sendRR(msg, liveHosts.toArray(new 
InetAddress[]{}), new IAsyncCallback() 
+        {
+            @Override
+            public void response(Message msg)
+            {
+                // record the response from the remote node.
+                logger.debug("Received schema check response from " + 
msg.getFrom().getHostAddress());
+                UUID theirVersion = UUID.fromString(new 
String(msg.getMessageBody()));
+                versions.put(msg.getFrom(), theirVersion);
+                latch.countDown();
+            }
+        });
+        
+        try
+        {
+            // wait for as long as possible. timeout-1s if possible.
+            latch.await(DatabaseDescriptor.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
+        } 
+        catch (InterruptedException ex) 
+        {
+            throw new AssertionError("This latch shouldn't have been 
interrupted.");
+        }
+        
+        logger.debug("My version is " + myVersion);
+        
+        // first, indicate any hosts that did not respond.
+        final Set<InetAddress> ackedHosts = versions.keySet();
+        if (ackedHosts.size() < liveHosts.size())
+        {
+            Set<InetAddress> missingHosts = new 
HashSet<InetAddress>(liveHosts);
+            missingHosts.removeAll(ackedHosts);
+            assert missingHosts.size() > 0;
+            List<String> missingHostNames = new 
ArrayList<String>(missingHosts.size());
+            for (InetAddress host : missingHosts)
+                missingHostNames.add(host.getHostAddress());
+            results.put(DatabaseDescriptor.INITIAL_VERSION.toString(), 
missingHostNames);
+            logger.debug("Hosts not in agreement. Didn't get a response from 
everybody: " + StringUtils.join(missingHostNames, ","));
+        }
+        
+        // check for version disagreement. log the hosts that don't agree.
+        for (InetAddress host : ackedHosts)
+        {
+            String uuid = versions.get(host).toString();
+            if (!results.containsKey(uuid))
+                results.put(uuid, new ArrayList<String>());
+            results.get(uuid).add(host.getHostAddress());
+            if (!uuid.equals(myVersion))
+                logger.debug("%s disagrees (%s)", host.getHostAddress(), uuid);
+        }
+        if (results.size() == 1)
+            logger.debug("Schemas are in agreement.");
+        
+        return results;
+    }
+
+    /**
      * returns an iterator that will return ranges in ring order, starting 
with the one that contains the start token
      */
     private static Iterable<Pair<AbstractBounds, List<InetAddress>>> 
getRangeIterator(final List<Pair<AbstractBounds, List<InetAddress>>> ranges, 
Token start)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=946717&r1=946716&r2=946717&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Thu May 20 17:13:17 2010
@@ -111,6 +111,7 @@ public class StorageService implements I
         DEFINITIONS_ANNOUNCE,
         DEFINITIONS_UPDATE_RESPONSE,
         TRUNCATE,
+        SCHEMA_CHECK,
         ;
         // remember to add new verbs at the end, since we serialize by ordinal
     }
@@ -236,6 +237,7 @@ public class StorageService implements I
         
MessagingService.instance.registerVerbHandlers(Verb.DEFINITIONS_ANNOUNCE, new 
DefinitionsAnnounceVerbHandler());
         
MessagingService.instance.registerVerbHandlers(Verb.DEFINITIONS_UPDATE_RESPONSE,
 new DefinitionsUpdateResponseVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.TRUNCATE, new 
TruncateVerbHandler());
+        MessagingService.instance.registerVerbHandlers(Verb.SCHEMA_CHECK, new 
SchemaCheckVerbHandler());
 
         replicationStrategies = new HashMap<String, 
AbstractReplicationStrategy>();
         for (String table : DatabaseDescriptor.getNonSystemTables())

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=946717&r1=946716&r2=946717&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java 
Thu May 20 17:13:17 2010
@@ -856,5 +856,12 @@ public class CassandraServer implements 
         keySpace.set(keyspace); 
     }
 
+    @Override
+    public Map<String, List<String>> check_schema_agreement() throws 
TException, InvalidRequestException
+    {
+        logger.debug("checking schema agreement");      
+        return StorageProxy.checkSchemaAgreement();
+    }
+
     // main method moved to CassandraDaemon
 }


Reply via email to