Support multiple outputs in BOF Patch by Michael Kjellman, reviewed by brandonwilliams for CASSANDRA-4912
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/edf63d8a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/edf63d8a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/edf63d8a Branch: refs/heads/cassandra-1.2.0 Commit: edf63d8a5110aaaa1efa708343d1dce071d031cd Parents: 3957df1 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Mon Nov 19 14:25:41 2012 -0600 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Mon Nov 19 14:25:41 2012 -0600 ---------------------------------------------------------------------- .../apache/cassandra/hadoop/BulkOutputFormat.java | 4 ++-- .../apache/cassandra/hadoop/BulkRecordWriter.java | 15 ++++++++++----- 2 files changed, 12 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/edf63d8a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java index 7a6a1d7..f1c5f39 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java @@ -37,9 +37,9 @@ public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>> private void checkOutputSpecs(Configuration conf) { - if (ConfigHelper.getOutputKeyspace(conf) == null || ConfigHelper.getOutputColumnFamily(conf) == null) + if (ConfigHelper.getOutputKeyspace(conf) == null) { - throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setColumnFamily()"); + throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()"); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/edf63d8a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java index 4883743..3f70ca5 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java @@ -60,7 +60,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class); private SSTableSimpleUnsortedWriter writer; private SSTableLoader loader; - private final File outputdir; + private File outputdir; private Progressable progress; private int maxFailures; @@ -99,9 +99,6 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> this.conf = conf; DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0"))); maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0")); - String keyspace = ConfigHelper.getOutputKeyspace(conf); - outputdir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf)); //dir must be named by ks/cf for the loader - outputdir.mkdirs(); } private String getOutputLocation() throws IOException @@ -127,8 +124,16 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> } } - private void prepareWriter() + private void prepareWriter() throws IOException { + if (outputdir == null) + { + String keyspace = ConfigHelper.getOutputKeyspace(conf); + //dir must be named by ks/cf for the loader + outputdir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf)); + outputdir.mkdirs(); + } + if (writer == null) { AbstractType<?> subcomparator = null;