Author: jbellis Date: Mon Jul 26 16:04:06 2010 New Revision: 979335 URL: http://svn.apache.org/viewvc?rev=979335&view=rev Log: merge from 0.6
Modified: cassandra/trunk/ (props changed) cassandra/trunk/CHANGES.txt cassandra/trunk/contrib/word_count/src/WordCount.java cassandra/trunk/interface/cassandra.thrift cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (contents, props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed) cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Propchange: cassandra/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Jul 26 16:04:06 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6:922689-964141,965151,965457,965537,965604,965630-966676,979156 +/cassandra/branches/cassandra-0.6:922689-965151,965457,965537,965604,965630-966676,979156 /cassandra/trunk:978791 /incubator/cassandra/branches/cassandra-0.3:774578-796573 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350 Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=979335&r1=979334&r2=979335&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Mon Jul 26 16:04:06 2010 @@ -67,6 +67,10 @@ dev (CASSANDRA-685) * add ack to Binary write verb and update CassandraBulkLoader to wait for acks for each row (CASSANDRA-1093) + * added describe_partitioner Thrift method (CASSANDRA-1047) + * Hadoop jobs no longer require the Cassandra storage-conf.xml + (CASSANDRA-1280, CASSANDRA-1047) +* log thread pool stats when GC is excessive (CASSANDRA-1275) 0.6.3 Modified: cassandra/trunk/contrib/word_count/src/WordCount.java URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/src/WordCount.java?rev=979335&r1=979334&r2=979335&view=diff ============================================================================== --- cassandra/trunk/contrib/word_count/src/WordCount.java (original) +++ cassandra/trunk/contrib/word_count/src/WordCount.java Mon Jul 26 16:04:06 2010 @@ -129,6 +129,7 @@ public class WordCount extends Configure job.setInputFormatClass(ColumnFamilyInputFormat.class); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX + i)); + ConfigHelper.setThriftContact(conf, "localhost", 9160); ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes())); ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate); Modified: cassandra/trunk/interface/cassandra.thrift URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.thrift?rev=979335&r1=979334&r2=979335&view=diff ============================================================================== --- cassandra/trunk/interface/cassandra.thrift (original) +++ cassandra/trunk/interface/cassandra.thrift Mon Jul 26 16:04:06 2010 @@ -534,6 +534,9 @@ service Cassandra { list<TokenRange> describe_ring(1:required string keyspace) throws (1:InvalidRequestException ire), + /** returns the partitioner used by this cluster */ + string describe_partitioner(), + /** describe specified keyspace */ map<string, map<string, string>> describe_keyspace(1:required string keyspace) throws (1:NotFoundException nfe), Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java?rev=979335&r1=979334&r2=979335&view=diff ============================================================================== --- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (original) +++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java Mon Jul 26 16:04:06 2010 @@ -221,6 +221,11 @@ public class Cassandra { public List<TokenRange> describe_ring(String keyspace) throws InvalidRequestException, TException; /** + * returns the partitioner used by this cluster + */ + public String describe_partitioner() throws TException; + + /** * describe specified keyspace * * @param keyspace @@ -328,6 +333,8 @@ public class Cassandra { public void describe_ring(String keyspace, AsyncMethodCallback<AsyncClient.describe_ring_call> resultHandler) throws TException; + public void describe_partitioner(AsyncMethodCallback<AsyncClient.describe_partitioner_call> resultHandler) throws TException; + public void describe_keyspace(String keyspace, AsyncMethodCallback<AsyncClient.describe_keyspace_call> resultHandler) throws TException; public void describe_splits(String keyspace, String cfName, String start_token, String end_token, int keys_per_split, AsyncMethodCallback<AsyncClient.describe_splits_call> resultHandler) throws TException; @@ -1202,6 +1209,41 @@ public class Cassandra { throw new TApplicationException(TApplicationException.MISSING_RESULT, "describe_ring failed: unknown result"); } + public String describe_partitioner() throws TException + { + send_describe_partitioner(); + return recv_describe_partitioner(); + } + + public void send_describe_partitioner() throws TException + { + oprot_.writeMessageBegin(new TMessage("describe_partitioner", TMessageType.CALL, ++seqid_)); + describe_partitioner_args args = new describe_partitioner_args(); + args.write(oprot_); + oprot_.writeMessageEnd(); + oprot_.getTransport().flush(); + } + + public String recv_describe_partitioner() throws TException + { + TMessage msg = iprot_.readMessageBegin(); + if (msg.type == TMessageType.EXCEPTION) { + TApplicationException x = TApplicationException.read(iprot_); + iprot_.readMessageEnd(); + throw x; + } + if (msg.seqid != seqid_) { + throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, "describe_partitioner failed: out of sequence response"); + } + describe_partitioner_result result = new describe_partitioner_result(); + result.read(iprot_); + iprot_.readMessageEnd(); + if (result.isSetSuccess()) { + return result.success; + } + throw new TApplicationException(TApplicationException.MISSING_RESULT, "describe_partitioner failed: unknown result"); + } + public Map<String,Map<String,String>> describe_keyspace(String keyspace) throws NotFoundException, TException { send_describe_keyspace(keyspace); @@ -2205,6 +2247,34 @@ public class Cassandra { } } + public void describe_partitioner(AsyncMethodCallback<describe_partitioner_call> resultHandler) throws TException { + checkReady(); + describe_partitioner_call method_call = new describe_partitioner_call(resultHandler, this, protocolFactory, transport); + manager.call(method_call); + } + + public static class describe_partitioner_call extends TAsyncMethodCall { + public describe_partitioner_call(AsyncMethodCallback<describe_partitioner_call> resultHandler, TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport) throws TException { + super(client, protocolFactory, transport, resultHandler, false); + } + + public void write_args(TProtocol prot) throws TException { + prot.writeMessageBegin(new TMessage("describe_partitioner", TMessageType.CALL, 0)); + describe_partitioner_args args = new describe_partitioner_args(); + args.write(prot); + prot.writeMessageEnd(); + } + + public String getResult() throws TException { + if (getState() != State.RESPONSE_READ) { + throw new IllegalStateException("Method call not finished!"); + } + TMemoryInputTransport memoryTransport = new TMemoryInputTransport(getFrameBuffer().array()); + TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); + return (new Client(prot)).recv_describe_partitioner(); + } + } + public void describe_keyspace(String keyspace, AsyncMethodCallback<describe_keyspace_call> resultHandler) throws TException { checkReady(); describe_keyspace_call method_call = new describe_keyspace_call(keyspace, resultHandler, this, protocolFactory, transport); @@ -2497,6 +2567,7 @@ public class Cassandra { processMap_.put("describe_cluster_name", new describe_cluster_name()); processMap_.put("describe_version", new describe_version()); processMap_.put("describe_ring", new describe_ring()); + processMap_.put("describe_partitioner", new describe_partitioner()); processMap_.put("describe_keyspace", new describe_keyspace()); processMap_.put("describe_splits", new describe_splits()); processMap_.put("system_add_column_family", new system_add_column_family()); @@ -3269,6 +3340,32 @@ public class Cassandra { } + private class describe_partitioner implements ProcessFunction { + public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException + { + describe_partitioner_args args = new describe_partitioner_args(); + try { + args.read(iprot); + } catch (TProtocolException e) { + iprot.readMessageEnd(); + TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage()); + oprot.writeMessageBegin(new TMessage("describe_partitioner", TMessageType.EXCEPTION, seqid)); + x.write(oprot); + oprot.writeMessageEnd(); + oprot.getTransport().flush(); + return; + } + iprot.readMessageEnd(); + describe_partitioner_result result = new describe_partitioner_result(); + result.success = iface_.describe_partitioner(); + oprot.writeMessageBegin(new TMessage("describe_partitioner", TMessageType.REPLY, seqid)); + result.write(oprot); + oprot.writeMessageEnd(); + oprot.getTransport().flush(); + } + + } + private class describe_keyspace implements ProcessFunction { public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException { @@ -20786,6 +20883,480 @@ public class Cassandra { } + public static class describe_partitioner_args implements TBase<describe_partitioner_args, describe_partitioner_args._Fields>, java.io.Serializable, Cloneable { + private static final TStruct STRUCT_DESC = new TStruct("describe_partitioner_args"); + + + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements TFieldIdEnum { +; + + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + public static final Map<_Fields, FieldMetaData> metaDataMap; + static { + Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class); + metaDataMap = Collections.unmodifiableMap(tmpMap); + FieldMetaData.addStructMetaDataMap(describe_partitioner_args.class, metaDataMap); + } + + public describe_partitioner_args() { + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public describe_partitioner_args(describe_partitioner_args other) { + } + + public describe_partitioner_args deepCopy() { + return new describe_partitioner_args(this); + } + + @Deprecated + public describe_partitioner_args clone() { + return new describe_partitioner_args(this); + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + } + } + + public void setFieldValue(int fieldID, Object value) { + setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value); + } + + public Object getFieldValue(_Fields field) { + switch (field) { + } + throw new IllegalStateException(); + } + + public Object getFieldValue(int fieldId) { + return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId)); + } + + /** Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + switch (field) { + } + throw new IllegalStateException(); + } + + public boolean isSet(int fieldID) { + return isSet(_Fields.findByThriftIdOrThrow(fieldID)); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof describe_partitioner_args) + return this.equals((describe_partitioner_args)that); + return false; + } + + public boolean equals(describe_partitioner_args that) { + if (that == null) + return false; + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + public int compareTo(describe_partitioner_args other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + describe_partitioner_args typedOther = (describe_partitioner_args)other; + + return 0; + } + + public void read(TProtocol iprot) throws TException { + TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == TType.STOP) { + break; + } + switch (field.id) { + default: + TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + validate(); + } + + public void write(TProtocol oprot) throws TException { + validate(); + + oprot.writeStructBegin(STRUCT_DESC); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("describe_partitioner_args("); + boolean first = true; + + sb.append(")"); + return sb.toString(); + } + + public void validate() throws TException { + // check for required fields + } + + } + + public static class describe_partitioner_result implements TBase<describe_partitioner_result, describe_partitioner_result._Fields>, java.io.Serializable, Cloneable { + private static final TStruct STRUCT_DESC = new TStruct("describe_partitioner_result"); + + private static final TField SUCCESS_FIELD_DESC = new TField("success", TType.STRING, (short)0); + + public String success; + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements TFieldIdEnum { + SUCCESS((short)0, "success"); + + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 0: // SUCCESS + return SUCCESS; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + + public static final Map<_Fields, FieldMetaData> metaDataMap; + static { + Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.SUCCESS, new FieldMetaData("success", TFieldRequirementType.DEFAULT, + new FieldValueMetaData(TType.STRING))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + FieldMetaData.addStructMetaDataMap(describe_partitioner_result.class, metaDataMap); + } + + public describe_partitioner_result() { + } + + public describe_partitioner_result( + String success) + { + this(); + this.success = success; + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public describe_partitioner_result(describe_partitioner_result other) { + if (other.isSetSuccess()) { + this.success = other.success; + } + } + + public describe_partitioner_result deepCopy() { + return new describe_partitioner_result(this); + } + + @Deprecated + public describe_partitioner_result clone() { + return new describe_partitioner_result(this); + } + + public String getSuccess() { + return this.success; + } + + public describe_partitioner_result setSuccess(String success) { + this.success = success; + return this; + } + + public void unsetSuccess() { + this.success = null; + } + + /** Returns true if field success is set (has been asigned a value) and false otherwise */ + public boolean isSetSuccess() { + return this.success != null; + } + + public void setSuccessIsSet(boolean value) { + if (!value) { + this.success = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case SUCCESS: + if (value == null) { + unsetSuccess(); + } else { + setSuccess((String)value); + } + break; + + } + } + + public void setFieldValue(int fieldID, Object value) { + setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value); + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case SUCCESS: + return getSuccess(); + + } + throw new IllegalStateException(); + } + + public Object getFieldValue(int fieldId) { + return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId)); + } + + /** Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + switch (field) { + case SUCCESS: + return isSetSuccess(); + } + throw new IllegalStateException(); + } + + public boolean isSet(int fieldID) { + return isSet(_Fields.findByThriftIdOrThrow(fieldID)); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof describe_partitioner_result) + return this.equals((describe_partitioner_result)that); + return false; + } + + public boolean equals(describe_partitioner_result that) { + if (that == null) + return false; + + boolean this_present_success = true && this.isSetSuccess(); + boolean that_present_success = true && that.isSetSuccess(); + if (this_present_success || that_present_success) { + if (!(this_present_success && that_present_success)) + return false; + if (!this.success.equals(that.success)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + public int compareTo(describe_partitioner_result other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + describe_partitioner_result typedOther = (describe_partitioner_result)other; + + lastComparison = Boolean.valueOf(isSetSuccess()).compareTo(typedOther.isSetSuccess()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetSuccess()) { lastComparison = TBaseHelper.compareTo(this.success, typedOther.success); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public void read(TProtocol iprot) throws TException { + TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == TType.STOP) { + break; + } + switch (field.id) { + case 0: // SUCCESS + if (field.type == TType.STRING) { + this.success = iprot.readString(); + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + default: + TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + validate(); + } + + public void write(TProtocol oprot) throws TException { + oprot.writeStructBegin(STRUCT_DESC); + + if (this.isSetSuccess()) { + oprot.writeFieldBegin(SUCCESS_FIELD_DESC); + oprot.writeString(this.success); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("describe_partitioner_result("); + boolean first = true; + + sb.append("success:"); + if (this.success == null) { + sb.append("null"); + } else { + sb.append(this.success); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws TException { + // check for required fields + } + + } + public static class describe_keyspace_args implements TBase<describe_keyspace_args, describe_keyspace_args._Fields>, java.io.Serializable, Cloneable { private static final TStruct STRUCT_DESC = new TStruct("describe_keyspace_args"); Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Jul 26 16:04:06 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-964141,965151,965457,965537,965604,965630-966676,979156 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-965151,965457,965537,965604,965630-966676,979156 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:978791 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Jul 26 16:04:06 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-964141,965151,965457,965537,965604,965630-966676,979156 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-965151,965457,965537,965604,965630-966676,979156 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:978791 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Jul 26 16:04:06 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-964141,965151,965457,965537,965604,965630-966676,979156 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-965151,965457,965537,965604,965630-966676,979156 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:978791 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Jul 26 16:04:06 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-964141,965151,965457,965537,965604,965630-966676,979156 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-965151,965457,965537,965604,965630-966676,979156 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:978791 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Jul 26 16:04:06 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-964141,965151,965457,965537,965604,965630-966676,979156 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-965151,965457,965537,965604,965630-966676,979156 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:978791 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350 Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=979335&r1=979334&r2=979335&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Mon Jul 26 16:04:06 2010 @@ -205,10 +205,9 @@ public class DatabaseDescriptor } try { - Class cls = Class.forName(conf.partitioner); - partitioner = (IPartitioner) cls.getConstructor().newInstance(); + partitioner = newPartitioner(conf.partitioner); } - catch (ClassNotFoundException e) + catch (Exception e) { throw new ConfigurationException("Invalid partitioner class " + conf.partitioner); } @@ -386,6 +385,22 @@ public class DatabaseDescriptor } } + public static IPartitioner newPartitioner(String partitionerClassName) + { + if (!partitionerClassName.contains(".")) + partitionerClassName = "org.apache.cassandra.dht." + partitionerClassName; + + try + { + Class cls = Class.forName(partitionerClassName); + return (IPartitioner) cls.getConstructor().newInstance(); + } + catch (Exception e) + { + throw new RuntimeException("Invalid partitioner class " + partitionerClassName); + } + } + private static IEndpointSnitch createEndpointSnitch(String endpointSnitchClassName) throws ConfigurationException { IEndpointSnitch snitch; @@ -716,19 +731,7 @@ public class DatabaseDescriptor ex.initCause(e); throw ex; } - catch (IllegalAccessException e) - { - ConfigurationException ex = new ConfigurationException(e.getMessage()); - ex.initCause(e); - throw ex; - } - catch (InvocationTargetException e) - { - ConfigurationException ex = new ConfigurationException(e.getMessage()); - ex.initCause(e); - throw ex; - } - catch (NoSuchMethodException e) + catch (Exception e) { ConfigurationException ex = new ConfigurationException(e.getMessage()); ex.initCause(e); Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=979335&r1=979334&r2=979335&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Mon Jul 26 16:04:06 2010 @@ -32,10 +32,11 @@ import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.thrift.*; +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.TokenRange; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.*; import org.apache.thrift.TException; @@ -64,11 +65,10 @@ public class ColumnFamilyInputFormat ext { private static final Logger logger = LoggerFactory.getLogger(StorageService.class); - private int splitsize; private String keyspace; private String cfName; - private void validateConfiguration(Configuration conf) + private static void validateConfiguration(Configuration conf) { if (ConfigHelper.getInputKeyspace(conf) == null || ConfigHelper.getInputColumnFamily(conf) == null) { @@ -87,9 +87,8 @@ public class ColumnFamilyInputFormat ext validateConfiguration(conf); // cannonical ranges and nodes holding replicas - List<TokenRange> masterRangeNodes = getRangeMap(ConfigHelper.getInputKeyspace(conf)); + List<TokenRange> masterRangeNodes = getRangeMap(conf); - splitsize = ConfigHelper.getInputSplitSize(context.getConfiguration()); keyspace = ConfigHelper.getInputKeyspace(context.getConfiguration()); cfName = ConfigHelper.getInputColumnFamily(context.getConfiguration()); @@ -103,7 +102,7 @@ public class ColumnFamilyInputFormat ext for (TokenRange range : masterRangeNodes) { // for each range, pick a live owner and ask it to compute bite-sized splits - splitfutures.add(executor.submit(new SplitCallable(range))); + splitfutures.add(executor.submit(new SplitCallable(range, conf))); } // wait until we have all the results back @@ -136,18 +135,19 @@ public class ColumnFamilyInputFormat ext class SplitCallable implements Callable<List<InputSplit>> { - private TokenRange range; - - public SplitCallable(TokenRange tr) + private final TokenRange range; + private final Configuration conf; + + public SplitCallable(TokenRange tr, Configuration conf) { this.range = tr; + this.conf = conf; } - @Override public List<InputSplit> call() throws Exception { ArrayList<InputSplit> splits = new ArrayList<InputSplit>(); - List<String> tokens = getSubSplits(keyspace, cfName, range, splitsize); + List<String> tokens = getSubSplits(keyspace, cfName, range, conf); // turn the sub-ranges into InputSplits String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]); @@ -167,13 +167,13 @@ public class ColumnFamilyInputFormat ext } } - private List<String> getSubSplits(String keyspace, String cfName, TokenRange range, int splitsize) throws IOException + private List<String> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException { // TODO handle failure of range replicas & retry - TSocket socket = new TSocket(range.endpoints.get(0), - DatabaseDescriptor.getRpcPort()); + TSocket socket = new TSocket(range.endpoints.get(0), ConfigHelper.getThriftPort(conf)); TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false); Cassandra.Client client = new Cassandra.Client(binaryProtocol); + int splitsize = ConfigHelper.getInputSplitSize(conf); try { socket.open(); @@ -194,10 +194,9 @@ public class ColumnFamilyInputFormat ext return splits; } - private List<TokenRange> getRangeMap(String keyspace) throws IOException + private List<TokenRange> getRangeMap(Configuration conf) throws IOException { - TSocket socket = new TSocket(DatabaseDescriptor.getSeeds().iterator().next().getHostAddress(), - DatabaseDescriptor.getRpcPort()); + TSocket socket = new TSocket(ConfigHelper.getInitialAddress(conf), ConfigHelper.getThriftPort(conf)); TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false); Cassandra.Client client = new Cassandra.Client(binaryProtocol); try @@ -211,7 +210,7 @@ public class ColumnFamilyInputFormat ext List<TokenRange> map; try { - map = client.describe_ring(keyspace); + map = client.describe_ring(ConfigHelper.getInputKeyspace(conf)); } catch (TException e) { @@ -224,7 +223,6 @@ public class ColumnFamilyInputFormat ext return map; } - @Override public RecordReader<byte[], SortedMap<byte[], IColumn>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new ColumnFamilyRecordReader(); Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=979335&r1=979334&r2=979335&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Mon Jul 26 16:04:06 2010 @@ -30,6 +30,8 @@ import com.google.common.collect.Abstrac import org.apache.cassandra.auth.AllowAllAuthenticator; import org.apache.cassandra.auth.SimpleAuthenticator; + +import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.clock.AbstractReconciler; @@ -57,6 +59,7 @@ public class ColumnFamilyRecordReader ex private int batchRowCount; // fetch this many per batch private String cfName; private String keyspace; + private Configuration conf; private AuthenticationRequest authRequest; private TSocket socket; private Cassandra.Client client; @@ -90,7 +93,7 @@ public class ColumnFamilyRecordReader ex public void initialize(InputSplit split, TaskAttemptContext context) throws IOException { this.split = (ColumnFamilySplit) split; - Configuration conf = context.getConfiguration(); + conf = context.getConfiguration(); predicate = ConfigHelper.getInputSlicePredicate(conf); totalRowCount = ConfigHelper.getInputSplitSize(conf); batchRowCount = ConfigHelper.getRangeBatchSize(conf); @@ -115,12 +118,36 @@ public class ColumnFamilyRecordReader ex private class RowIterator extends AbstractIterator<Pair<byte[], SortedMap<byte[], IColumn>>> { - private List<KeySlice> rows; private String startToken; private int totalRead = 0; private int i = 0; - private AbstractType comparator = null; + private final AbstractType comparator; + private final AbstractType subComparator; + private final IPartitioner partitioner; + + private RowIterator() + { + try + { + partitioner = DatabaseDescriptor.newPartitioner(client.describe_partitioner()); + Map<String, String> info = client.describe_keyspace(keyspace).get(cfName); + comparator = DatabaseDescriptor.getComparator(info.get("CompareWith")); + subComparator = DatabaseDescriptor.getComparator(info.get("CompareSubcolumnsWith")); + } + catch (ConfigurationException e) + { + throw new RuntimeException("unable to load sub/comparator", e); + } + catch (TException e) + { + throw new RuntimeException("error communicating via Thrift", e); + } + catch (NotFoundException e) + { + throw new RuntimeException("server reports no such keyspace " + keyspace, e); + } + } private void maybeInit() { @@ -172,9 +199,8 @@ public class ColumnFamilyRecordReader ex // prepare for the next slice to be read KeySlice lastRow = rows.get(rows.size() - 1); - IPartitioner p = DatabaseDescriptor.getPartitioner(); byte[] rowkey = lastRow.getKey(); - startToken = p.getTokenFactory().toString(p.getToken(rowkey)); + startToken = partitioner.getTokenFactory().toString(partitioner.getToken(rowkey)); } catch (Exception e) { @@ -206,17 +232,6 @@ public class ColumnFamilyRecordReader ex { client.login(authRequest); } - - // Get the keyspace information to get the comparator - if (comparator == null) - { - Map<String, Map<String,String>> desc = client.describe_keyspace(keyspace); - Map<String,String> ksProps = desc.get(cfName); - String compClass = ksProps.get("CompareWith"); - // Get the singleton instance of the AbstractType subclass - Class<?> c = Class.forName(compClass); - comparator = (AbstractType) c.getField("instance").get(c); - } } @@ -280,35 +295,34 @@ public class ColumnFamilyRecordReader ex } return new Pair<byte[], SortedMap<byte[], IColumn>>(ks.key, map); } - } - private IColumn unthriftify(ColumnOrSuperColumn cosc) - { - if (cosc.column == null) - return unthriftifySuper(cosc.super_column); - return unthriftifySimple(cosc.column); - } + private IColumn unthriftify(ColumnOrSuperColumn cosc) + { + if (cosc.column == null) + return unthriftifySuper(cosc.super_column); + return unthriftifySimple(cosc.column); + } - private IColumn unthriftifySuper(SuperColumn super_column) - { - AbstractType subComparator = DatabaseDescriptor.getSubComparator(keyspace, cfName); - ClockType clockType = DatabaseDescriptor.getClockType(keyspace, cfName); - AbstractReconciler reconciler = DatabaseDescriptor.getReconciler(keyspace, cfName); - org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator, clockType, reconciler); - for (Column column : super_column.columns) + private IColumn unthriftifySuper(SuperColumn super_column) { - sc.addColumn(unthriftifySimple(column)); + ClockType clockType = DatabaseDescriptor.getClockType(keyspace, cfName); + AbstractReconciler reconciler = DatabaseDescriptor.getReconciler(keyspace, cfName); + org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator, clockType, reconciler); + for (Column column : super_column.columns) + { + sc.addColumn(unthriftifySimple(column)); + } + return sc; } - return sc; - } - private IColumn unthriftifySimple(Column column) - { - return new org.apache.cassandra.db.Column(column.name, column.value, unthriftifyClock(column.clock)); - } + private IColumn unthriftifySimple(Column column) + { + return new org.apache.cassandra.db.Column(column.name, column.value, unthriftifyClock(column.clock)); + } - private static IClock unthriftifyClock(Clock clock) - { - return new org.apache.cassandra.db.TimestampClock(clock.getTimestamp()); + private IClock unthriftifyClock(Clock clock) + { + return new org.apache.cassandra.db.TimestampClock(clock.getTimestamp()); + } } } Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=979335&r1=979334&r2=979335&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Mon Jul 26 16:04:06 2010 @@ -21,6 +21,11 @@ package org.apache.cassandra.hadoop; */ +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.utils.FBUtilities; import org.apache.hadoop.conf.Configuration; @@ -42,14 +47,20 @@ public class ConfigHelper private static final String INPUT_PREDICATE_CONFIG = "cassandra.input.predicate"; private static final String OUTPUT_PREDICATE_CONFIG = "cassandra.output.predicate"; private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size"; - private static final int DEFAULT_SPLIT_SIZE = 64*1024; + private static final int DEFAULT_SPLIT_SIZE = 64 * 1024; private static final String RANGE_BATCH_SIZE_CONFIG = "cassandra.range.batch.size"; private static final int DEFAULT_RANGE_BATCH_SIZE = 4096; + private static final String THRIFT_PORT = "cassandra.thrift.port"; + private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address"; + private static final String COMPARATOR = "cassandra.input.comparator"; + private static final String SUB_COMPARATOR = "cassandra.input.subcomparator"; + private static final String PARTITIONER = "cassandra.partitioner"; /** * Set the keyspace and column family for the input of this job. + * Comparator and Partitioner types will be read from storage-conf.xml. * - * @param conf Job configuration you are about to run + * @param conf Job configuration you are about to run * @param keyspace * @param columnFamily */ @@ -91,12 +102,27 @@ public class ConfigHelper } /** + * The address and port of a Cassandra node that Hadoop can contact over Thrift + * to learn more about the Cassandra cluster. Optional when storage-conf.xml + * is provided. + * + * @param conf + * @param address + * @param port + */ + public static void setThriftContact(Configuration conf, String address, int port) + { + conf.set(THRIFT_PORT, String.valueOf(port)); + conf.set(INITIAL_THRIFT_ADDRESS, address); + } + + /** * The number of rows to request with each get range slices request. * Too big and you can either get timeouts when it takes Cassandra too * long to fetch all the data. Too small and the performance - * will be eaten up by the overhead of each request. + * will be eaten up by the overhead of each request. * - * @param conf Job configuration you are about to run + * @param conf Job configuration you are about to run * @param batchsize Number of rows to request each time */ public static void setRangeBatchSize(Configuration conf, int batchsize) @@ -108,7 +134,7 @@ public class ConfigHelper * The number of rows to request with each get range slices request. * Too big and you can either get timeouts when it takes Cassandra too * long to fetch all the data. Too small and the performance - * will be eaten up by the overhead of each request. + * will be eaten up by the overhead of each request. * * @param conf Job configuration you are about to run * @return Number of rows to request each time @@ -117,13 +143,13 @@ public class ConfigHelper { return conf.getInt(RANGE_BATCH_SIZE_CONFIG, DEFAULT_RANGE_BATCH_SIZE); } - + /** * Set the size of the input split. * This affects the number of maps created, if the number is too small * the overhead of each map will take up the bulk of the job time. * - * @param conf Job configuration you are about to run + * @param conf Job configuration you are about to run * @param splitsize Size of the input split */ public static void setInputSplitSize(Configuration conf, int splitsize) @@ -139,7 +165,7 @@ public class ConfigHelper /** * Set the predicate that determines what columns will be selected from each row. * - * @param conf Job configuration you are about to run + * @param conf Job configuration you are about to run * @param predicate */ public static void setInputSlicePredicate(Configuration conf, SlicePredicate predicate) @@ -222,4 +248,16 @@ public class ConfigHelper { return conf.get(OUTPUT_COLUMNFAMILY_CONFIG); } + + public static int getThriftPort(Configuration conf) + { + String v = conf.get(THRIFT_PORT); + return v == null ? DatabaseDescriptor.getRpcPort() : Integer.valueOf(v); + } + + public static String getInitialAddress(Configuration conf) + { + String v = conf.get(INITIAL_THRIFT_ADDRESS); + return v == null ? DatabaseDescriptor.getSeeds().iterator().next().getHostAddress() : v; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=979335&r1=979334&r2=979335&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Mon Jul 26 16:04:06 2010 @@ -675,6 +675,11 @@ public class CassandraServer implements return ranges; } + public String describe_partitioner() throws TException + { + return StorageService.getPartitioner().getClass().getName(); + } + public List<String> describe_splits(String keyspace, String cfName, String start_token, String end_token, int keys_per_split) throws TException { Token.TokenFactory tf = StorageService.getPartitioner().getTokenFactory();