Author: jbellis
Date: Thu Jul 15 03:29:50 2010
New Revision: 964293

URL: http://svn.apache.org/viewvc?rev=964293&view=rev
Log:
replace comparator, partitioner configuration variables with introspection of 
Cassandra server.  patch by jbellis; reviewed by Jeremy Hanna for CASSANDRA-1047

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
    cassandra/branches/cassandra-0.6/interface/cassandra.thrift
    
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
    
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraServer.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=964293&r1=964292&r2=964293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Jul 15 03:29:50 2010
@@ -5,6 +5,9 @@
    (CASSANDRA-1232)
  * extend option to lower compaction priority to hinted handoff
    as well (CASSANDRA-1260)
+ * added describe_partitioner Thrift method (CASSANDRA-1047)
+ * Hadoop jobs no longer require the Cassandra storage-conf.xml
+   (CASSANDRA-1280, CASSANDRA-1047)
 * log thread pool stats when GC is excessive (CASSANDRA-1275)
 
 

Modified: cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java?rev=964293&r1=964292&r2=964293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java 
(original)
+++ cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java Thu 
Jul 15 03:29:50 2010
@@ -129,7 +129,7 @@ public class WordCount extends Configure
             FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX + 
i));
 
             ConfigHelper.setThriftContact(conf, "localhost",  9160);
-            ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, 
COLUMN_FAMILY, "BytesType", "RandomPartitioner");
+            ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, 
COLUMN_FAMILY);
             SlicePredicate predicate = new 
SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
             ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);
 

Modified: cassandra/branches/cassandra-0.6/interface/cassandra.thrift
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/interface/cassandra.thrift?rev=964293&r1=964292&r2=964293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/interface/cassandra.thrift (original)
+++ cassandra/branches/cassandra-0.6/interface/cassandra.thrift Thu Jul 15 
03:29:50 2010
@@ -46,7 +46,7 @@ namespace rb CassandraThrift
 #           for every edit that doesn't result in a change to major/minor.
 #
 # See the Semantic Versioning Specification (SemVer) http://semver.org.
-const string VERSION = "2.1.0"
+const string VERSION = "2.2.0"
 
 #
 # data structures
@@ -446,6 +446,9 @@ service Cassandra {
   list<TokenRange> describe_ring(1:required string keyspace)
                    throws (1:InvalidRequestException ire),
 
+  /** returns the partitioner used by this cluster */
+  string describe_partitioner(),
+
   /** describe specified keyspace */
   map<string, map<string, string>> describe_keyspace(1:required string 
keyspace)
                                    throws (1:NotFoundException nfe),

Modified: 
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java?rev=964293&r1=964292&r2=964293&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
 Thu Jul 15 03:29:50 2010
@@ -228,6 +228,11 @@ public class Cassandra {
     public List<TokenRange> describe_ring(String keyspace) throws 
InvalidRequestException, TException;
 
     /**
+     * returns the partitioner used by this cluster
+     */
+    public String describe_partitioner() throws TException;
+
+    /**
      * describe specified keyspace
      * 
      * @param keyspace
@@ -1005,6 +1010,38 @@ public class Cassandra {
       throw new TApplicationException(TApplicationException.MISSING_RESULT, 
"describe_ring failed: unknown result");
     }
 
+    public String describe_partitioner() throws TException
+    {
+      send_describe_partitioner();
+      return recv_describe_partitioner();
+    }
+
+    public void send_describe_partitioner() throws TException
+    {
+      oprot_.writeMessageBegin(new TMessage("describe_partitioner", 
TMessageType.CALL, seqid_));
+      describe_partitioner_args args = new describe_partitioner_args();
+      args.write(oprot_);
+      oprot_.writeMessageEnd();
+      oprot_.getTransport().flush();
+    }
+
+    public String recv_describe_partitioner() throws TException
+    {
+      TMessage msg = iprot_.readMessageBegin();
+      if (msg.type == TMessageType.EXCEPTION) {
+        TApplicationException x = TApplicationException.read(iprot_);
+        iprot_.readMessageEnd();
+        throw x;
+      }
+      describe_partitioner_result result = new describe_partitioner_result();
+      result.read(iprot_);
+      iprot_.readMessageEnd();
+      if (result.isSetSuccess()) {
+        return result.success;
+      }
+      throw new TApplicationException(TApplicationException.MISSING_RESULT, 
"describe_partitioner failed: unknown result");
+    }
+
     public Map<String,Map<String,String>> describe_keyspace(String keyspace) 
throws NotFoundException, TException
     {
       send_describe_keyspace(keyspace);
@@ -1100,6 +1137,7 @@ public class Cassandra {
       processMap_.put("describe_cluster_name", new describe_cluster_name());
       processMap_.put("describe_version", new describe_version());
       processMap_.put("describe_ring", new describe_ring());
+      processMap_.put("describe_partitioner", new describe_partitioner());
       processMap_.put("describe_keyspace", new describe_keyspace());
       processMap_.put("describe_splits", new describe_splits());
     }
@@ -1802,6 +1840,32 @@ public class Cassandra {
 
     }
 
+    private class describe_partitioner implements ProcessFunction {
+      public void process(int seqid, TProtocol iprot, TProtocol oprot) throws 
TException
+      {
+        describe_partitioner_args args = new describe_partitioner_args();
+        try {
+          args.read(iprot);
+        } catch (TProtocolException e) {
+          iprot.readMessageEnd();
+          TApplicationException x = new 
TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
+          oprot.writeMessageBegin(new TMessage("describe_partitioner", 
TMessageType.EXCEPTION, seqid));
+          x.write(oprot);
+          oprot.writeMessageEnd();
+          oprot.getTransport().flush();
+          return;
+        }
+        iprot.readMessageEnd();
+        describe_partitioner_result result = new describe_partitioner_result();
+        result.success = iface_.describe_partitioner();
+        oprot.writeMessageBegin(new TMessage("describe_partitioner", 
TMessageType.REPLY, seqid));
+        result.write(oprot);
+        oprot.writeMessageEnd();
+        oprot.getTransport().flush();
+      }
+
+    }
+
     private class describe_keyspace implements ProcessFunction {
       public void process(int seqid, TProtocol iprot, TProtocol oprot) throws 
TException
       {
@@ -18727,6 +18791,476 @@ public class Cassandra {
 
   }
 
+  public static class describe_partitioner_args implements 
TBase<describe_partitioner_args._Fields>, java.io.Serializable, Cloneable, 
Comparable<describe_partitioner_args>   {
+    private static final TStruct STRUCT_DESC = new 
TStruct("describe_partitioner_args");
+
+
+
+    /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
+    public enum _Fields implements TFieldIdEnum {
+;
+
+      private static final Map<Integer, _Fields> byId = new HashMap<Integer, 
_Fields>();
+      private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byId.put((int)field._thriftId, field);
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not 
found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        return byId.get(fieldId);
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + 
fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+    public static final Map<_Fields, FieldMetaData> metaDataMap = 
Collections.unmodifiableMap(new EnumMap<_Fields, FieldMetaData>(_Fields.class) 
{{
+    }});
+
+    static {
+      FieldMetaData.addStructMetaDataMap(describe_partitioner_args.class, 
metaDataMap);
+    }
+
+    public describe_partitioner_args() {
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public describe_partitioner_args(describe_partitioner_args other) {
+    }
+
+    public describe_partitioner_args deepCopy() {
+      return new describe_partitioner_args(this);
+    }
+
+    @Deprecated
+    public describe_partitioner_args clone() {
+      return new describe_partitioner_args(this);
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      }
+    }
+
+    public void setFieldValue(int fieldID, Object value) {
+      setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    public Object getFieldValue(int fieldId) {
+      return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been 
asigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    public boolean isSet(int fieldID) {
+      return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof describe_partitioner_args)
+        return this.equals((describe_partitioner_args)that);
+      return false;
+    }
+
+    public boolean equals(describe_partitioner_args that) {
+      if (that == null)
+        return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(describe_partitioner_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      describe_partitioner_args typedOther = (describe_partitioner_args)other;
+
+      return 0;
+    }
+
+    public void read(TProtocol iprot) throws TException {
+      TField field;
+      iprot.readStructBegin();
+      while (true)
+      {
+        field = iprot.readFieldBegin();
+        if (field.type == TType.STOP) { 
+          break;
+        }
+        switch (field.id) {
+          default:
+            TProtocolUtil.skip(iprot, field.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked 
in the validate method
+      validate();
+    }
+
+    public void write(TProtocol oprot) throws TException {
+      validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("describe_partitioner_args(");
+      boolean first = true;
+
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws TException {
+      // check for required fields
+    }
+
+  }
+
+  public static class describe_partitioner_result implements 
TBase<describe_partitioner_result._Fields>, java.io.Serializable, Cloneable, 
Comparable<describe_partitioner_result>   {
+    private static final TStruct STRUCT_DESC = new 
TStruct("describe_partitioner_result");
+
+    private static final TField SUCCESS_FIELD_DESC = new TField("success", 
TType.STRING, (short)0);
+
+    public String success;
+
+    /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
+    public enum _Fields implements TFieldIdEnum {
+      SUCCESS((short)0, "success");
+
+      private static final Map<Integer, _Fields> byId = new HashMap<Integer, 
_Fields>();
+      private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byId.put((int)field._thriftId, field);
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not 
found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        return byId.get(fieldId);
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + 
fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+
+    public static final Map<_Fields, FieldMetaData> metaDataMap = 
Collections.unmodifiableMap(new EnumMap<_Fields, FieldMetaData>(_Fields.class) 
{{
+      put(_Fields.SUCCESS, new FieldMetaData("success", 
TFieldRequirementType.DEFAULT, 
+          new FieldValueMetaData(TType.STRING)));
+    }});
+
+    static {
+      FieldMetaData.addStructMetaDataMap(describe_partitioner_result.class, 
metaDataMap);
+    }
+
+    public describe_partitioner_result() {
+    }
+
+    public describe_partitioner_result(
+      String success)
+    {
+      this();
+      this.success = success;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public describe_partitioner_result(describe_partitioner_result other) {
+      if (other.isSetSuccess()) {
+        this.success = other.success;
+      }
+    }
+
+    public describe_partitioner_result deepCopy() {
+      return new describe_partitioner_result(this);
+    }
+
+    @Deprecated
+    public describe_partitioner_result clone() {
+      return new describe_partitioner_result(this);
+    }
+
+    public String getSuccess() {
+      return this.success;
+    }
+
+    public describe_partitioner_result setSuccess(String success) {
+      this.success = success;
+      return this;
+    }
+
+    public void unsetSuccess() {
+      this.success = null;
+    }
+
+    /** Returns true if field success is set (has been asigned a value) and 
false otherwise */
+    public boolean isSetSuccess() {
+      return this.success != null;
+    }
+
+    public void setSuccessIsSet(boolean value) {
+      if (!value) {
+        this.success = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case SUCCESS:
+        if (value == null) {
+          unsetSuccess();
+        } else {
+          setSuccess((String)value);
+        }
+        break;
+
+      }
+    }
+
+    public void setFieldValue(int fieldID, Object value) {
+      setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case SUCCESS:
+        return getSuccess();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    public Object getFieldValue(int fieldId) {
+      return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been 
asigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      switch (field) {
+      case SUCCESS:
+        return isSetSuccess();
+      }
+      throw new IllegalStateException();
+    }
+
+    public boolean isSet(int fieldID) {
+      return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof describe_partitioner_result)
+        return this.equals((describe_partitioner_result)that);
+      return false;
+    }
+
+    public boolean equals(describe_partitioner_result that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_success = true && this.isSetSuccess();
+      boolean that_present_success = true && that.isSetSuccess();
+      if (this_present_success || that_present_success) {
+        if (!(this_present_success && that_present_success))
+          return false;
+        if (!this.success.equals(that.success))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(describe_partitioner_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      describe_partitioner_result typedOther = 
(describe_partitioner_result)other;
+
+      lastComparison = 
Boolean.valueOf(isSetSuccess()).compareTo(typedOther.isSetSuccess());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetSuccess()) {        lastComparison = 
TBaseHelper.compareTo(success, typedOther.success);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public void read(TProtocol iprot) throws TException {
+      TField field;
+      iprot.readStructBegin();
+      while (true)
+      {
+        field = iprot.readFieldBegin();
+        if (field.type == TType.STOP) { 
+          break;
+        }
+        switch (field.id) {
+          case 0: // SUCCESS
+            if (field.type == TType.STRING) {
+              this.success = iprot.readString();
+            } else { 
+              TProtocolUtil.skip(iprot, field.type);
+            }
+            break;
+          default:
+            TProtocolUtil.skip(iprot, field.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked 
in the validate method
+      validate();
+    }
+
+    public void write(TProtocol oprot) throws TException {
+      oprot.writeStructBegin(STRUCT_DESC);
+
+      if (this.isSetSuccess()) {
+        oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+        oprot.writeString(this.success);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("describe_partitioner_result(");
+      boolean first = true;
+
+      sb.append("success:");
+      if (this.success == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.success);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws TException {
+      // check for required fields
+    }
+
+  }
+
   public static class describe_keyspace_args implements 
TBase<describe_keyspace_args._Fields>, java.io.Serializable, Cloneable, 
Comparable<describe_keyspace_args>   {
     private static final TStruct STRUCT_DESC = new 
TStruct("describe_keyspace_args");
 

Modified: 
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java?rev=964293&r1=964292&r2=964293&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
 Thu Jul 15 03:29:50 2010
@@ -42,6 +42,6 @@ import org.slf4j.LoggerFactory;
 
 public class Constants {
 
-  public static final String VERSION = "2.1.0";
+  public static final String VERSION = "2.2.0";
 
 }

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=964293&r1=964292&r2=964293&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
 Thu Jul 15 03:29:50 2010
@@ -25,10 +25,13 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.List;
+import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
 import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
@@ -40,9 +43,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransportException;
 
 public class ColumnFamilyRecordReader extends RecordReader<String, 
SortedMap<byte[], IColumn>>
 {
@@ -104,34 +107,44 @@ public class ColumnFamilyRecordReader ex
         private String startToken;
         private int totalRead = 0;
         private int i = 0;
-        private AbstractType comparator = ConfigHelper.getComparator(conf);
-        private AbstractType subComparator = 
ConfigHelper.getSubComparator(conf);
-        private IPartitioner partitioner = ConfigHelper.getPartitioner(conf);
+        private final AbstractType comparator;
+        private final AbstractType subComparator;
+        private final IPartitioner partitioner;
         private TSocket socket;
+        private Cassandra.Client client;
 
-        private void maybeInit()
+        private RowIterator()
         {
-            // check if we need another batch 
-            if (rows != null && i >= rows.size())
-                rows = null;
-            
-            if (rows != null)
-                return;
-            
-            // close previous connection if one is open
-            close();
-            
             socket = new TSocket(getLocation(), 
ConfigHelper.getThriftPort(conf));
             TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, 
false, false);
-            Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+            client = new Cassandra.Client(binaryProtocol);
+
             try
             {
                 socket.open();
+                partitioner = 
DatabaseDescriptor.newPartitioner(client.describe_partitioner());
+                Map<String, String> info = 
client.describe_keyspace(keyspace).get(cfName);
+                comparator = 
DatabaseDescriptor.getComparator(info.get("CompareWith"));
+                subComparator = 
DatabaseDescriptor.getComparator(info.get("CompareSubcolumnsWith"));
             }
-            catch (TTransportException e)
+            catch (TException e)
             {
-                throw new RuntimeException(e);
+                throw new RuntimeException("error communicating via Thrift", 
e);
             }
+            catch (NotFoundException e)
+            {
+                throw new RuntimeException("server reports no such keyspace " 
+ keyspace, e);
+            }
+        }
+
+        private void maybeInit()
+        {
+            // check if we need another batch 
+            if (rows != null && i >= rows.size())
+                rows = null;
+            
+            if (rows != null)
+                return;
             
             if (startToken == null)
             {

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=964293&r1=964292&r2=964293&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
 Thu Jul 15 03:29:50 2010
@@ -51,22 +51,6 @@ public class ConfigHelper
     private static final String PARTITIONER = "cassandra.partitioner";
 
     /**
-     * Set the keyspace, column family, column comparator, and row partitioner 
for this job.
-     *
-     * @param conf         Job configuration you are about to run
-     * @param keyspace
-     * @param columnFamily
-     * @param comparator
-     * @param partitioner
-     */
-    public static void setColumnFamily(Configuration conf, String keyspace, 
String columnFamily, String comparator, String partitioner)
-    {
-        setColumnFamily(conf, keyspace, columnFamily);
-        conf.set(COMPARATOR, comparator);
-        conf.set(PARTITIONER, partitioner);
-    }
-
-    /**
      * Set the keyspace and column family for this job.
      * Comparator and Partitioner types will be read from storage-conf.xml.
      *
@@ -97,18 +81,6 @@ public class ConfigHelper
     }
 
     /**
-     * Set the subcomparator to use in the configured ColumnFamily [of 
SuperColumns].
-     * Optional when storage-conf.xml is provided.
-     *
-     * @param conf
-     * @param subComparator
-     */
-    public static void setSubComparator(Configuration conf, String 
subComparator)
-    {
-        conf.set(SUB_COMPARATOR, subComparator);
-    }
-
-    /**
      * The address and port of a Cassandra node that Hadoop can contact over 
Thrift
      * to learn more about the Cassandra cluster.  Optional when 
storage-conf.xml
      * is provided.
@@ -237,26 +209,4 @@ public class ConfigHelper
         String v = conf.get(INITIAL_THRIFT_ADDRESS);
         return v == null ? 
DatabaseDescriptor.getSeeds().iterator().next().getHostAddress() : v;
     }
-
-    public static AbstractType getComparator(Configuration conf)
-    {
-        String v = conf.get(COMPARATOR);
-        return v == null
-               ? DatabaseDescriptor.getComparator(getKeyspace(conf), 
getColumnFamily(conf))
-               : DatabaseDescriptor.getComparator(v);
-    }
-
-    public static AbstractType getSubComparator(Configuration conf)
-    {
-        String v = conf.get(SUB_COMPARATOR);
-        return v == null
-               ? DatabaseDescriptor.getSubComparator(getKeyspace(conf), 
getColumnFamily(conf))
-               : DatabaseDescriptor.getComparator(v);
-    }
-
-    public static IPartitioner getPartitioner(Configuration conf)
-    {
-        String v = conf.get(PARTITIONER);
-        return v == null ? DatabaseDescriptor.getPartitioner() : 
DatabaseDescriptor.newPartitioner(v);
-    }
 }

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=964293&r1=964292&r2=964293&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraServer.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraServer.java
 Thu Jul 15 03:29:50 2010
@@ -633,6 +633,11 @@ public class CassandraServer implements 
         return ranges;
     }
 
+    public String describe_partitioner() throws TException
+    {
+        return StorageService.getPartitioner().getClass().getName();
+    }
+
     public List<String> describe_splits(String start_token, String end_token, 
int keys_per_split) throws TException
     {
         Token.TokenFactory tf = 
StorageService.getPartitioner().getTokenFactory();
@@ -668,6 +673,5 @@ public class CassandraServer implements 
         if (!loginDone.get()) throw new InvalidRequestException("Login is 
required before any other API calls");
     }
 
-    
     // main method moved to CassandraDaemon
 }


Reply via email to