Author: brandonwilliams Date: Tue Nov 29 18:41:38 2011 New Revision: 1208021
URL: http://svn.apache.org/viewvc?rev=1208021&view=rev Log: Add old-style api support to CFIF and CFRR. Patch by Steeve Morin, reviewed by brandonwilliams for CASSANDRA-2799 Modified: cassandra/branches/cassandra-1.0/CHANGES.txt cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java Modified: cassandra/branches/cassandra-1.0/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1208021&r1=1208020&r2=1208021&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/CHANGES.txt (original) +++ cassandra/branches/cassandra-1.0/CHANGES.txt Tue Nov 29 18:41:38 2011 @@ -1,7 +1,12 @@ -1.0.5 +1.0.6 * add command to stop compactions (CASSANDRA-1740) * filter out unavailable cipher suites when using encryption (CASSANDRA-3178) - * fix assertion error when forwarding to local nodes (CASSANDRA-3539) + * (HADOOP) add old-style api support for CFIF and CFRR (CASSANDRA-2799) + +1.0.5 + * revert CASSANDRA-3407 (see CASSANDRA-3540) + * fix assertion error while forwarding writes to local nodes (CASSANDRA-3539) + 1.0.4 * fix self-hinting of timed out read repair updates and make hinted handoff Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1208021&r1=1208020&r2=1208021&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Tue Nov 29 18:41:38 2011 @@ -44,11 +44,14 @@ import org.apache.cassandra.thrift.KeyRa import org.apache.cassandra.thrift.TokenRange; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,9 +74,18 @@ import org.slf4j.LoggerFactory; * The default split size is 64k rows. */ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<ByteBuffer, IColumn>> + implements org.apache.hadoop.mapred.InputFormat<ByteBuffer, SortedMap<ByteBuffer, IColumn>> { private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class); + public static final String MAPRED_TASK_ID = "mapred.task.id"; + // The simple fact that we need this is because the old Hadoop API wants us to "write" + // to the key and value whereas the new asks for it. + // I choose 8kb as the default max key size (instanciated only once), but you can + // override it in your jobConf with this setting. + public static final String CASSANDRA_HADOOP_MAX_KEY_SIZE = "cassandra.hadoop.max_key_size"; + public static final int CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192; + private String keyspace; private String cfName; @@ -262,10 +274,39 @@ public class ColumnFamilyInputFormat ext return map; } - - public RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new ColumnFamilyRecordReader(); } + + + // + // Old Hadoop API + // + public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException + { + TaskAttemptContext tac = new TaskAttemptContext(jobConf, new TaskAttemptID()); + List<org.apache.hadoop.mapreduce.InputSplit> newInputSplits = this.getSplits(tac); + org.apache.hadoop.mapred.InputSplit[] oldInputSplits = new org.apache.hadoop.mapred.InputSplit[newInputSplits.size()]; + for (int i = 0; i < newInputSplits.size(); i++) + oldInputSplits[i] = (ColumnFamilySplit)newInputSplits.get(i); + return oldInputSplits; + } + + public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException + { + TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID))) + { + @Override + public void progress() + { + reporter.progress(); + } + }; + + ColumnFamilyRecordReader recordReader = new ColumnFamilyRecordReader(jobConf.getInt(CASSANDRA_HADOOP_MAX_KEY_SIZE, CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT)); + recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac); + return recordReader; + } + } Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1208021&r1=1208020&r2=1208021&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Tue Nov 29 18:41:38 2011 @@ -51,7 +51,10 @@ import org.apache.thrift.transport.TFram import org.apache.thrift.transport.TSocket; public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> + implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> { + public static final int CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192; + private ColumnFamilySplit split; private RowIterator iter; private Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> currentRow; @@ -64,6 +67,18 @@ public class ColumnFamilyRecordReader ex private TSocket socket; private Cassandra.Client client; private ConsistencyLevel consistencyLevel; + private int keyBufferSize = 8192; + + public ColumnFamilyRecordReader() + { + this(ColumnFamilyRecordReader.CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT); + } + + public ColumnFamilyRecordReader(int keyBufferSize) + { + super(); + this.keyBufferSize = keyBufferSize; + } public void close() { @@ -387,4 +402,41 @@ public class ColumnFamilyRecordReader ex return sc; } } + + + // Because the old Hadoop API wants us to write to the key and value + // and the new asks for them, we need to copy the output of the new API + // to the old. Thus, expect a small performance hit. + // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat + // and ColumnFamilyRecordReader don't support them, it should be fine for now. + public boolean next(ByteBuffer key, SortedMap<ByteBuffer, IColumn> value) throws IOException + { + if (this.nextKeyValue()) + { + key.clear(); + key.put(this.getCurrentKey()); + key.rewind(); + + value.clear(); + value.putAll(this.getCurrentValue()); + + return true; + } + return false; + } + + public ByteBuffer createKey() + { + return ByteBuffer.wrap(new byte[this.keyBufferSize]); + } + + public SortedMap<ByteBuffer, IColumn> createValue() + { + return new TreeMap<ByteBuffer, IColumn>(); + } + + public long getPos() throws IOException + { + return (long)iter.rowsRead(); + } } Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java?rev=1208021&r1=1208020&r2=1208021&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java Tue Nov 29 18:41:38 2011 @@ -21,15 +21,15 @@ package org.apache.cassandra.hadoop; */ +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Arrays; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.InputSplit; - -public class ColumnFamilySplit extends InputSplit implements Writable +public class ColumnFamilySplit extends InputSplit implements Writable, org.apache.hadoop.mapred.InputSplit { private String startToken; private String endToken;