[ https://issues.apache.org/jira/browse/CASSANDRA-4912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13493577#comment-13493577 ]
Michael Kjellman commented on CASSANDRA-4912: --------------------------------------------- So when ConfigHelper calls checkOutputSpecs() in local mode when the job is setup we don't throw any exceptions. When a reducer is created however org.apache.cassandra.hadoop.ConfigHelper.getOutputColumnFamily throws a UnsupportedOperationException that the output column family isn't setup. It looks like mapreduce.output.basename is null. Job Config is something along the lines of public int run(String[] args) throws Exception { Job job = new Job(getConf(), "Nashoba"); job.setJarByClass(Nashoba.class); job.setMapperClass(TokenizerMapper.class); job.setReducerClass(ReducerToCassandra.class); job.setInputFormatClass(ColumnFamilyInputFormat.class); // setup 3 reducers job.setNumReduceTasks(3); // thrift input job settings ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160"); ConfigHelper.setInputInitialAddress(job.getConfiguration(), "127.0.0.1"); ConfigHelper.setInputPartitioner(job.getConfiguration(), "RandomPartitioner"); // thrift output job settings ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160"); ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "127.0.0.1"); ConfigHelper.setOutputPartitioner(job.getConfiguration(), "RandomPartitioner"); //set timeout to 1 hour for testing job.getConfiguration().set("mapreduce.task.timeout", "3600000"); job.getConfiguration().set("mapred.task.timeout", "3600000"); job.getConfiguration().set("mapreduce.output.bulkoutputformat.buffersize", "64"); job.setOutputFormatClass(BulkOutputFormat.class); ConfigHelper.setRangeBatchSize(getConf(), 99); // let ConfigHelper know what Column Family to get data from and where to output it ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, INPUT_COLUMN_FAMILY); ConfigHelper.setOutputKeyspace(job.getConfiguration(), KEYSPACE); MultipleOutputs.addNamedOutput(job, OUTPUT_COLUMN_FAMILY1, BulkOutputFormat.class, ByteBuffer.class, List.class); MultipleOutputs.addNamedOutput(job, OUTPUT_COLUMN_FAMILY2, BulkOutputFormat.class, ByteBuffer.class, List.class); //what classes the mapper will write and what the consumer should expect to recieve job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MapWritable.class); job.setOutputKeyClass(ByteBuffer.class); job.setOutputValueClass(List.class); SliceRange sliceRange = new SliceRange(); sliceRange.setStart(new bytes[0]); sliceRange.setFinish(new bytes[0]); SlicePredicate predicate = new SlicePredicate(); predicate.setSlice_range(sliceRange); ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate); job.waitForCompletion(true); return 0; } public static class ReducerToCassandra extends Reducer<Text, MapWritable, ByteBuffer, List<Mutation>> { private MultipleOutputs<ByteBuffer, List<Mutation>> output; @Override public void setup(Context context) { output = new MultipleOutputs<ByteBuffer, List<Mutation>>(context); } public void reduce(Text word, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException { do stuff in reducer... //write out our result to Hadoop context.progress(); //for writing to 2 column families output.write(OUTPUT_COLUMN_FAMILY1, key, Collections.singletonList(getMutation1(word, val))); output.write(OUTPUT_COLUMN_FAMILY2, key, Collections.singletonList(getMutation2(word, val))); } public void cleanup(Context context) throws IOException, InterruptedException { output.close(); //closes all of the opened outputs } } > BulkOutputFormat should support Hadoop MultipleOutput > ----------------------------------------------------- > > Key: CASSANDRA-4912 > URL: https://issues.apache.org/jira/browse/CASSANDRA-4912 > Project: Cassandra > Issue Type: New Feature > Components: Hadoop > Affects Versions: 1.2.0 beta 1 > Reporter: Michael Kjellman > > Much like CASSANDRA-4208 BOF should support outputting to Multiple Column > Families. The current approach takken in the patch for COF results in only > one stream being sent. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira