Author: brandonwilliams Date: Tue Mar 15 17:22:45 2011 New Revision: 1081867
URL: http://svn.apache.org/viewvc?rev=1081867&view=rev Log: Allow Hadoop jobs to set the consistency level. Patch by Eldon Stegall, reviewed by brandonwilliams for CASSANDRA-2331 Modified: cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1081867&r1=1081866&r2=1081867&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Mar 15 17:22:45 2011 @@ -3,6 +3,7 @@ * fix tombstone handling in repair and sstable2json (CASSANDRA-2279) * clear Built flag in system table when dropping an index (CASSANDRA-2320) * validate index names (CASSANDRA-1761) + * allow job configuration to set the CL used in Hadoop jobs (CASSANDRA-2331) 0.7.4 Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1081867&r1=1081866&r2=1081867&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Tue Mar 15 17:22:45 2011 @@ -57,6 +57,7 @@ public class ColumnFamilyRecordReader ex private String keyspace; private TSocket socket; private Cassandra.Client client; + private ConsistencyLevel consistencyLevel; public void close() { @@ -92,6 +93,9 @@ public class ColumnFamilyRecordReader ex totalRowCount = ConfigHelper.getInputSplitSize(conf); batchRowCount = ConfigHelper.getRangeBatchSize(conf); cfName = ConfigHelper.getInputColumnFamily(conf); + consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(conf)); + + keyspace = ConfigHelper.getInputKeyspace(conf); try @@ -238,7 +242,7 @@ public class ColumnFamilyRecordReader ex rows = client.get_range_slices(new ColumnParent(cfName), predicate, keyRange, - ConsistencyLevel.ONE); + consistencyLevel); // nothing new? reached the end if (rows.isEmpty()) Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1081867&r1=1081866&r2=1081867&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Tue Mar 15 17:22:45 2011 @@ -78,6 +78,9 @@ implements org.apache.hadoop.mapred.Reco // handles for clients for each range running in the threadpool private final Map<Range,RangeClient> clients; private final long batchThreshold; + + private final ConsistencyLevel consistencyLevel; + /** * Upon construction, obtain the map that this writer will use to collect @@ -101,6 +104,7 @@ implements org.apache.hadoop.mapred.Reco this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * Runtime.getRuntime().availableProcessors()); this.clients = new HashMap<Range,RangeClient>(); batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32); + consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf)); } /** @@ -347,7 +351,7 @@ implements org.apache.hadoop.mapred.Reco // send the mutation to the last-used endpoint. first time through, this will NPE harmlessly. try { - thriftClient.batch_mutate(batch, ConsistencyLevel.ONE); + thriftClient.batch_mutate(batch, consistencyLevel); break; } catch (Exception e) Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1081867&r1=1081866&r2=1081867&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Tue Mar 15 17:22:45 2011 @@ -49,6 +49,8 @@ public class ConfigHelper private static final int DEFAULT_RANGE_BATCH_SIZE = 4096; private static final String THRIFT_PORT = "cassandra.thrift.port"; private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address"; + private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read"; + private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write"; /** * Set the keyspace and column family for the input of this job. @@ -222,12 +224,22 @@ public class ConfigHelper { return conf.get(INPUT_COLUMNFAMILY_CONFIG); } - + public static String getOutputColumnFamily(Configuration conf) { return conf.get(OUTPUT_COLUMNFAMILY_CONFIG); } + public static String getReadConsistencyLevel(Configuration conf) + { + return conf.get(READ_CONSISTENCY_LEVEL, "ONE"); + } + + public static String getWriteConsistencyLevel(Configuration conf) + { + return conf.get(WRITE_CONSISTENCY_LEVEL, "ONE"); + } + public static int getRpcPort(Configuration conf) { return Integer.parseInt(conf.get(THRIFT_PORT));