Author: jbellis Date: Wed Jul 14 21:58:16 2010 New Revision: 964217 URL: http://svn.apache.org/viewvc?rev=964217&view=rev Log: make storage-conf.xml optional for Hadoop jobs. patch by jbellis; reviewed by Jeremy Hanna for CASSANDRA-1280
Modified: cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Modified: cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java?rev=964217&r1=964216&r2=964217&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java (original) +++ cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java Wed Jul 14 21:58:16 2010 @@ -128,7 +128,8 @@ public class WordCount extends Configure job.setInputFormatClass(ColumnFamilyInputFormat.class); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX + i)); - ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); + ConfigHelper.setThriftContact(conf, "localhost", 9160); + ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY, "BytesType", "RandomPartitioner"); SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes())); ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate); Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=964217&r1=964216&r2=964217&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Jul 14 21:58:16 2010 @@ -268,10 +268,9 @@ public class DatabaseDescriptor } try { - Class cls = Class.forName(partitionerClassName); - partitioner = (IPartitioner) cls.getConstructor().newInstance(); + partitioner = newPartitioner(partitionerClassName); } - catch (ClassNotFoundException e) + catch (Exception e) { throw new ConfigurationException("Invalid partitioner class " + partitionerClassName); } @@ -544,6 +543,22 @@ public class DatabaseDescriptor } } + public static IPartitioner newPartitioner(String partitionerClassName) + { + if (!partitionerClassName.contains(".")) + partitionerClassName = "org.apache.cassandra.dht." + partitionerClassName; + + try + { + Class cls = Class.forName(partitionerClassName); + return (IPartitioner) cls.getConstructor().newInstance(); + } + catch (Exception e) + { + throw new RuntimeException("Invalid partitioner class " + partitionerClassName); + } + } + private static void readTablesFromXml() throws ConfigurationException { XMLUtils xmlUtils = null; @@ -752,9 +767,7 @@ public class DatabaseDescriptor } private static AbstractType getComparator(Node columnFamily, String attr) throws ConfigurationException -// throws ConfigurationException, TransformerException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, InstantiationException { - Class<? extends AbstractType> typeClass; String compareWith = null; try { @@ -766,49 +779,38 @@ public class DatabaseDescriptor ex.initCause(e); throw ex; } - if (compareWith == null) - { - typeClass = BytesType.class; - } - else - { - String className = compareWith.contains(".") ? compareWith : "org.apache.cassandra.db.marshal." + compareWith; - try - { - typeClass = (Class<? extends AbstractType>)Class.forName(className); - } - catch (ClassNotFoundException e) - { - throw new ConfigurationException("Unable to load class " + className + " for " + attr + " attribute"); - } - } + try { - return typeClass.getConstructor().newInstance(); - } - catch (InstantiationException e) - { - ConfigurationException ex = new ConfigurationException(e.getMessage()); - ex.initCause(e); - throw ex; + return getComparator(compareWith); } - catch (IllegalAccessException e) + catch (Exception e) { ConfigurationException ex = new ConfigurationException(e.getMessage()); ex.initCause(e); throw ex; } - catch (InvocationTargetException e) + } + + public static AbstractType getComparator(String compareWith) + { + Class<? extends AbstractType> typeClass; + try { - ConfigurationException ex = new ConfigurationException(e.getMessage()); - ex.initCause(e); - throw ex; + if (compareWith == null) + { + typeClass = BytesType.class; + } + else + { + String className = compareWith.contains(".") ? compareWith : "org.apache.cassandra.db.marshal." + compareWith; + typeClass = (Class<? extends AbstractType>)Class.forName(className); + } + return typeClass.getConstructor().newInstance(); } - catch (NoSuchMethodException e) + catch (Exception e) { - ConfigurationException ex = new ConfigurationException(e.getMessage()); - ex.initCause(e); - throw ex; + throw new RuntimeException(e); } } Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=964217&r1=964216&r2=964217&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Wed Jul 14 21:58:16 2010 @@ -31,10 +31,11 @@ import java.util.concurrent.Future; import org.apache.log4j.Logger; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.thrift.*; +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.TokenRange; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.*; import org.apache.thrift.TException; @@ -61,10 +62,9 @@ import org.apache.thrift.transport.TTran */ public class ColumnFamilyInputFormat extends InputFormat<String, SortedMap<byte[], IColumn>> { - private static final Logger logger = Logger.getLogger(StorageService.class); - private void validateConfiguration(Configuration conf) + private static void validateConfiguration(Configuration conf) { if (ConfigHelper.getKeyspace(conf) == null || ConfigHelper.getColumnFamily(conf) == null) { @@ -83,9 +83,9 @@ public class ColumnFamilyInputFormat ext validateConfiguration(conf); // cannonical ranges and nodes holding replicas - List<TokenRange> masterRangeNodes = getRangeMap(ConfigHelper.getKeyspace(conf)); + List<TokenRange> masterRangeNodes = getRangeMap(conf); - int splitsize = ConfigHelper.getInputSplitSize(context.getConfiguration()); + int splitsize = ConfigHelper.getInputSplitSize(conf); // cannonical ranges, split into pieces, fetching the splits in parallel ExecutorService executor = Executors.newCachedThreadPool(); @@ -97,7 +97,7 @@ public class ColumnFamilyInputFormat ext for (TokenRange range : masterRangeNodes) { // for each range, pick a live owner and ask it to compute bite-sized splits - splitfutures.add(executor.submit(new SplitCallable(range, splitsize))); + splitfutures.add(executor.submit(new SplitCallable(range, splitsize, conf))); } // wait until we have all the results back @@ -130,16 +130,17 @@ public class ColumnFamilyInputFormat ext class SplitCallable implements Callable<List<InputSplit>> { - private TokenRange range; - private int splitsize; - - public SplitCallable(TokenRange tr, int splitsize) + private final TokenRange range; + private final int splitsize; + private final Configuration conf; + + public SplitCallable(TokenRange tr, int splitsize, Configuration conf) { this.range = tr; this.splitsize = splitsize; + this.conf = conf; } - @Override public List<InputSplit> call() throws Exception { ArrayList<InputSplit> splits = new ArrayList<InputSplit>(); @@ -161,39 +162,37 @@ public class ColumnFamilyInputFormat ext } return splits; } - } - private List<String> getSubSplits(TokenRange range, int splitsize) throws IOException - { - // TODO handle failure of range replicas & retry - TSocket socket = new TSocket(range.endpoints.get(0), - DatabaseDescriptor.getThriftPort()); - TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false); - Cassandra.Client client = new Cassandra.Client(binaryProtocol); - try - { - socket.open(); - } - catch (TTransportException e) - { - throw new IOException(e); - } - List<String> splits; - try - { - splits = client.describe_splits(range.start_token, range.end_token, splitsize); - } - catch (TException e) + private List<String> getSubSplits(TokenRange range, int splitsize) throws IOException { - throw new RuntimeException(e); + // TODO handle failure of range replicas & retry + TSocket socket = new TSocket(range.endpoints.get(0), ConfigHelper.getThriftPort(conf)); + TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false); + Cassandra.Client client = new Cassandra.Client(binaryProtocol); + try + { + socket.open(); + } + catch (TTransportException e) + { + throw new IOException(e); + } + List<String> splits; + try + { + splits = client.describe_splits(range.start_token, range.end_token, splitsize); + } + catch (TException e) + { + throw new RuntimeException(e); + } + return splits; } - return splits; } - private List<TokenRange> getRangeMap(String keyspace) throws IOException + private List<TokenRange> getRangeMap(Configuration conf) throws IOException { - TSocket socket = new TSocket(DatabaseDescriptor.getSeeds().iterator().next().getHostAddress(), - DatabaseDescriptor.getThriftPort()); + TSocket socket = new TSocket(ConfigHelper.getInitialAddress(conf), ConfigHelper.getThriftPort(conf)); TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false); Cassandra.Client client = new Cassandra.Client(binaryProtocol); try @@ -207,7 +206,7 @@ public class ColumnFamilyInputFormat ext List<TokenRange> map; try { - map = client.describe_ring(keyspace); + map = client.describe_ring(ConfigHelper.getKeyspace(conf)); } catch (TException e) { @@ -220,7 +219,6 @@ public class ColumnFamilyInputFormat ext return map; } - @Override public RecordReader<String, SortedMap<byte[], IColumn>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new ColumnFamilyRecordReader(); Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=964217&r1=964216&r2=964217&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Wed Jul 14 21:58:16 2010 @@ -29,7 +29,6 @@ import java.util.SortedMap; import java.util.TreeMap; import com.google.common.collect.AbstractIterator; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.IPartitioner; @@ -55,6 +54,7 @@ public class ColumnFamilyRecordReader ex private int batchRowCount; // fetch this many per batch private String cfName; private String keyspace; + private Configuration conf; public void close() { @@ -81,7 +81,7 @@ public class ColumnFamilyRecordReader ex public void initialize(InputSplit split, TaskAttemptContext context) throws IOException { this.split = (ColumnFamilySplit) split; - Configuration conf = context.getConfiguration(); + conf = context.getConfiguration(); predicate = ConfigHelper.getSlicePredicate(conf); totalRowCount = ConfigHelper.getInputSplitSize(conf); batchRowCount = ConfigHelper.getRangeBatchSize(conf); @@ -100,12 +100,13 @@ public class ColumnFamilyRecordReader ex private class RowIterator extends AbstractIterator<Pair<String, SortedMap<byte[], IColumn>>> { - private List<KeySlice> rows; private String startToken; private int totalRead = 0; private int i = 0; - private AbstractType comparator = DatabaseDescriptor.getComparator(keyspace, cfName); + private AbstractType comparator = ConfigHelper.getComparator(conf); + private AbstractType subComparator = ConfigHelper.getSubComparator(conf); + private IPartitioner partitioner = ConfigHelper.getPartitioner(conf); private TSocket socket; private void maybeInit() @@ -120,8 +121,7 @@ public class ColumnFamilyRecordReader ex // close previous connection if one is open close(); - socket = new TSocket(getLocation(), - DatabaseDescriptor.getThriftPort()); + socket = new TSocket(getLocation(), ConfigHelper.getThriftPort(conf)); TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false); Cassandra.Client client = new Cassandra.Client(binaryProtocol); try @@ -166,8 +166,7 @@ public class ColumnFamilyRecordReader ex // prepare for the next slice to be read KeySlice lastRow = rows.get(rows.size() - 1); - IPartitioner p = DatabaseDescriptor.getPartitioner(); - startToken = p.getTokenFactory().toString(p.getToken(lastRow.getKey())); + startToken = partitioner.getTokenFactory().toString(partitioner.getToken(lastRow.getKey())); } catch (Exception e) { @@ -243,28 +242,27 @@ public class ColumnFamilyRecordReader ex socket.close(); } } - } - private IColumn unthriftify(ColumnOrSuperColumn cosc) - { - if (cosc.column == null) - return unthriftifySuper(cosc.super_column); - return unthriftifySimple(cosc.column); - } + private IColumn unthriftify(ColumnOrSuperColumn cosc) + { + if (cosc.column == null) + return unthriftifySuper(cosc.super_column); + return unthriftifySimple(cosc.column); + } - private IColumn unthriftifySuper(SuperColumn super_column) - { - AbstractType subComparator = DatabaseDescriptor.getSubComparator(keyspace, cfName); - org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator); - for (Column column : super_column.columns) + private IColumn unthriftifySuper(SuperColumn super_column) { - sc.addColumn(unthriftifySimple(column)); + org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator); + for (Column column : super_column.columns) + { + sc.addColumn(unthriftifySimple(column)); + } + return sc; } - return sc; - } - private IColumn unthriftifySimple(Column column) - { - return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp); + private IColumn unthriftifySimple(Column column) + { + return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp); + } } } Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=964217&r1=964216&r2=964217&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Wed Jul 14 21:58:16 2010 @@ -21,6 +21,10 @@ package org.apache.cassandra.hadoop; */ +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.ThriftValidation; @@ -37,14 +41,36 @@ public class ConfigHelper private static final String COLUMNFAMILY_CONFIG = "cassandra.input.columnfamily"; private static final String PREDICATE_CONFIG = "cassandra.input.predicate"; private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size"; - private static final int DEFAULT_SPLIT_SIZE = 64*1024; + private static final int DEFAULT_SPLIT_SIZE = 64 * 1024; private static final String RANGE_BATCH_SIZE_CONFIG = "cassandra.range.batch.size"; private static final int DEFAULT_RANGE_BATCH_SIZE = 4096; + private static final String THRIFT_PORT = "cassandra.thrift.port"; + private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address"; + private static final String COMPARATOR = "cassandra.input.comparator"; + private static final String SUB_COMPARATOR = "cassandra.input.subcomparator"; + private static final String PARTITIONER = "cassandra.partitioner"; + + /** + * Set the keyspace, column family, column comparator, and row partitioner for this job. + * + * @param conf Job configuration you are about to run + * @param keyspace + * @param columnFamily + * @param comparator + * @param partitioner + */ + public static void setColumnFamily(Configuration conf, String keyspace, String columnFamily, String comparator, String partitioner) + { + setColumnFamily(conf, keyspace, columnFamily); + conf.set(COMPARATOR, comparator); + conf.set(PARTITIONER, partitioner); + } /** * Set the keyspace and column family for this job. + * Comparator and Partitioner types will be read from storage-conf.xml. * - * @param conf Job configuration you are about to run + * @param conf Job configuration you are about to run * @param keyspace * @param columnFamily */ @@ -71,12 +97,39 @@ public class ConfigHelper } /** + * Set the subcomparator to use in the configured ColumnFamily [of SuperColumns]. + * Optional when storage-conf.xml is provided. + * + * @param conf + * @param subComparator + */ + public static void setSubComparator(Configuration conf, String subComparator) + { + conf.set(SUB_COMPARATOR, subComparator); + } + + /** + * The address and port of a Cassandra node that Hadoop can contact over Thrift + * to learn more about the Cassandra cluster. Optional when storage-conf.xml + * is provided. + * + * @param conf + * @param address + * @param port + */ + public static void setThriftContact(Configuration conf, String address, int port) + { + conf.set(THRIFT_PORT, String.valueOf(port)); + conf.set(INITIAL_THRIFT_ADDRESS, address); + } + + /** * The number of rows to request with each get range slices request. * Too big and you can either get timeouts when it takes Cassandra too * long to fetch all the data. Too small and the performance - * will be eaten up by the overhead of each request. + * will be eaten up by the overhead of each request. * - * @param conf Job configuration you are about to run + * @param conf Job configuration you are about to run * @param batchsize Number of rows to request each time */ public static void setRangeBatchSize(Configuration conf, int batchsize) @@ -88,7 +141,7 @@ public class ConfigHelper * The number of rows to request with each get range slices request. * Too big and you can either get timeouts when it takes Cassandra too * long to fetch all the data. Too small and the performance - * will be eaten up by the overhead of each request. + * will be eaten up by the overhead of each request. * * @param conf Job configuration you are about to run * @return Number of rows to request each time @@ -97,13 +150,13 @@ public class ConfigHelper { return conf.getInt(RANGE_BATCH_SIZE_CONFIG, DEFAULT_RANGE_BATCH_SIZE); } - + /** * Set the size of the input split. * This affects the number of maps created, if the number is too small * the overhead of each map will take up the bulk of the job time. * - * @param conf Job configuration you are about to run + * @param conf Job configuration you are about to run * @param splitsize Size of the input split */ public static void setInputSplitSize(Configuration conf, int splitsize) @@ -119,7 +172,7 @@ public class ConfigHelper /** * Set the predicate that determines what columns will be selected from each row. * - * @param conf Job configuration you are about to run + * @param conf Job configuration you are about to run * @param predicate */ public static void setSlicePredicate(Configuration conf, SlicePredicate predicate) @@ -172,4 +225,38 @@ public class ConfigHelper { return conf.get(COLUMNFAMILY_CONFIG); } + + public static int getThriftPort(Configuration conf) + { + String v = conf.get(THRIFT_PORT); + return v == null ? DatabaseDescriptor.getThriftPort() : Integer.valueOf(v); + } + + public static String getInitialAddress(Configuration conf) + { + String v = conf.get(INITIAL_THRIFT_ADDRESS); + return v == null ? DatabaseDescriptor.getSeeds().iterator().next().getHostAddress() : v; + } + + public static AbstractType getComparator(Configuration conf) + { + String v = conf.get(COMPARATOR); + return v == null + ? DatabaseDescriptor.getComparator(getKeyspace(conf), getColumnFamily(conf)) + : DatabaseDescriptor.getComparator(v); + } + + public static AbstractType getSubComparator(Configuration conf) + { + String v = conf.get(SUB_COMPARATOR); + return v == null + ? DatabaseDescriptor.getSubComparator(getKeyspace(conf), getColumnFamily(conf)) + : DatabaseDescriptor.getComparator(v); + } + + public static IPartitioner getPartitioner(Configuration conf) + { + String v = conf.get(PARTITIONER); + return v == null ? DatabaseDescriptor.getPartitioner() : DatabaseDescriptor.newPartitioner(v); + } }