update configuration checking post-#3197 patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3765
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/27b81838 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/27b81838 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/27b81838 Branch: refs/heads/trunk Commit: 27b818386cdb123691702726f35d3ed6876c8b25 Parents: ee5dbdb Author: Jonathan Ellis <jbel...@apache.org> Authored: Sat Jan 21 15:41:06 2012 -0600 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Mon Jan 23 16:03:48 2012 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + examples/hadoop_word_count/src/WordCount.java | 5 ++++- .../cassandra/hadoop/ColumnFamilyInputFormat.java | 4 ++++ .../cassandra/hadoop/ColumnFamilyOutputFormat.java | 4 ++++ .../org/apache/cassandra/hadoop/ConfigHelper.java | 4 ++-- 5 files changed, 15 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/27b81838/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1e51f54..72188e5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.1-dev + * (Hadoop) separate input/output configurations (CASSANDRA-3197, 3765) * avoid returning internal Cassandra classes over JMX (CASSANDRA-2805) * add row-level isolation via SnapTree (CASSANDRA-2893) * Optimize key count estimation when opening sstable on startup http://git-wip-us.apache.org/repos/asf/cassandra/blob/27b81838/examples/hadoop_word_count/src/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/src/WordCount.java b/examples/hadoop_word_count/src/WordCount.java index e4bd631..e1c70bb 100644 --- a/examples/hadoop_word_count/src/WordCount.java +++ b/examples/hadoop_word_count/src/WordCount.java @@ -190,7 +190,7 @@ public class WordCount extends Configured implements Tool ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160"); ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost"); - ConfigHelper.setInputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner"); + ConfigHelper.setInputPartitioner(job.getConfiguration(), "RandomPartitioner"); ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes(columnName))); ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate); @@ -200,6 +200,9 @@ public class WordCount extends Configured implements Tool ConfigHelper.setInputRange(job.getConfiguration(), Arrays.asList(expr)); } + ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost"); + ConfigHelper.setOutputPartitioner(job.getConfiguration(), "RandomPartitioner"); + job.waitForCompletion(true); } return 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/27b81838/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java index 8abc460..5bef55b 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java @@ -100,6 +100,10 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B { throw new UnsupportedOperationException("you must set the predicate with setPredicate"); } + if (ConfigHelper.getInputInitialAddress(conf) == null) + throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node"); + + // input partitioner is optional -- used only if requesting an ordered key scan } public List<InputSplit> getSplits(JobContext context) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/27b81838/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java index 1c0bc66..fda2c22 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java @@ -87,6 +87,10 @@ public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutat { throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setColumnFamily()"); } + if (ConfigHelper.getOutputPartitioner(conf) == null) + throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster"); + if (ConfigHelper.getOutputInitialAddress(conf) == null) + throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node"); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/27b81838/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index 9d89a79..810ac80 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -317,7 +317,7 @@ public class ConfigHelper public static int getInputRpcPort(Configuration conf) { - return Integer.parseInt(conf.get(INPUT_THRIFT_PORT)); + return Integer.parseInt(conf.get(INPUT_THRIFT_PORT, "9160")); } public static void setInputRpcPort(Configuration conf, String port) @@ -354,7 +354,7 @@ public class ConfigHelper public static int getOutputRpcPort(Configuration conf) { - return Integer.parseInt(conf.get(OUTPUT_THRIFT_PORT)); + return Integer.parseInt(conf.get(OUTPUT_THRIFT_PORT, "9160")); } public static void setOutputRpcPort(Configuration conf, String port)