Author: brandonwilliams Date: Fri Oct 14 21:22:14 2011 New Revision: 1183506
URL: http://svn.apache.org/viewvc?rev=1183506&view=rev Log: Unify hadoop support for accept CDL for initial thrift address Patch by Eldon Stegall, reviewed by brandonwilliams for CASSANDRA-3185 Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/client/RingCache.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java cassandra/branches/cassandra-0.8/test/distributed/org/apache/cassandra/TestBase.java cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/client/TestRingCache.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1183506&r1=1183505&r2=1183506&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Fri Oct 14 21:22:14 2011 @@ -12,6 +12,8 @@ successfully acquired the compaction lock (CASSANDRA-3344) * (Hadoop) make CFIF try rpc_address or fallback to listen_address (CASSANDRA-3214) + * (Hadoop) accept comma delimited lists of initial thrift connections + (CASSANDRA-3185) 0.8.7 Modified: cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1183506&r1=1183505&r2=1183506&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original) +++ cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Fri Oct 14 21:22:14 2011 @@ -545,7 +545,7 @@ public class CassandraStorage extends Lo Cassandra.Client client = null; try { - client = createConnection(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf), true); + client = ConfigHelper.getClientFromAddressList(conf); CfDef cfDef = null; client.set_keyspace(keyspace); KsDef ksDef = client.describe_keyspace(keyspace); @@ -579,21 +579,6 @@ public class CassandraStorage extends Lo } } - private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException - { - TSocket socket = new TSocket(host, port); - TTransport trans = framed ? new TFramedTransport(socket) : socket; - try - { - trans.open(); - } - catch (TTransportException e) - { - throw new IOException("unable to connect to server", e); - } - return new Cassandra.Client(new TBinaryProtocol(trans)); - } - private static String cfdefToString(CfDef cfDef) { assert cfDef != null; Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/client/RingCache.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/client/RingCache.java?rev=1183506&r1=1183505&r2=1183506&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/client/RingCache.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/client/RingCache.java Fri Oct 14 21:22:14 2011 @@ -21,25 +21,22 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.HashSet; import java.util.List; -import java.util.Set; - -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Multimap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.TokenRange; +import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TException; -import org.apache.cassandra.thrift.TBinaryProtocol; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; /** * A class for caching the ring map at the client. For usage example, see @@ -50,42 +47,32 @@ public class RingCache { final private static Logger logger_ = LoggerFactory.getLogger(RingCache.class); - private final Set<String> seeds_ = new HashSet<String>(); - private final int port_; - private final IPartitioner<?> partitioner_; - private final String keyspace; + private final IPartitioner<?> partitioner; + private final Configuration conf; private Multimap<Range, InetAddress> rangeMap; - public RingCache(String keyspace, IPartitioner<?> partitioner, String addresses, int port) throws IOException + public RingCache(Configuration conf) throws IOException { - for (String seed : addresses.split(",")) - seeds_.add(seed); - this.port_ = port; - this.keyspace = keyspace; - this.partitioner_ = partitioner; + this.conf = conf; + this.partitioner = ConfigHelper.getPartitioner(conf); refreshEndpointMap(); } public void refreshEndpointMap() { - for (String seed : seeds_) - { - try - { - TSocket socket = new TSocket(seed, port_); - TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket)); - Cassandra.Client client = new Cassandra.Client(binaryProtocol); - socket.open(); + try { + + Cassandra.Client client = ConfigHelper.getClientFromAddressList(conf); - List<TokenRange> ring = client.describe_ring(keyspace); + List<TokenRange> ring = client.describe_ring(ConfigHelper.getOutputKeyspace(conf)); rangeMap = ArrayListMultimap.create(); for (TokenRange range : ring) { - Token<?> left = partitioner_.getTokenFactory().fromString(range.start_token); - Token<?> right = partitioner_.getTokenFactory().fromString(range.end_token); - Range r = new Range(left, right, partitioner_); + Token<?> left = partitioner.getTokenFactory().fromString(range.start_token); + Token<?> right = partitioner.getTokenFactory().fromString(range.end_token); + Range r = new Range(left, right, partitioner); for (String host : range.endpoints) { try @@ -98,19 +85,20 @@ public class RingCache } } } - break; } catch (InvalidRequestException e) { throw new RuntimeException(e); } + catch (IOException e) + { + throw new RuntimeException(e); + } catch (TException e) { - /* let the Exception go and try another seed. log this though */ - logger_.debug("Error contacting seed " + seed + " " + e.getMessage()); + logger_.debug("Error contacting seed list" + ConfigHelper.getInitialAddress(conf) + " " + e.getMessage()); } } - } /** ListMultimap promises to return a List for get(K) */ public List<InetAddress> getEndpoint(Range range) @@ -126,7 +114,7 @@ public class RingCache public Range getRange(ByteBuffer key) { // TODO: naive linear search of the token map - Token<?> t = partitioner_.getToken(key); + Token<?> t = partitioner.getToken(key); for (Range range : rangeMap.keySet()) if (range.contains(t)) return range; Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1183506&r1=1183505&r2=1183506&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Fri Oct 14 21:22:14 2011 @@ -24,16 +24,17 @@ package org.apache.cassandra.hadoop; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.SortedMap; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.db.IColumn; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; @@ -41,14 +42,16 @@ import org.apache.cassandra.thrift.Cassa import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.KeyRange; import org.apache.cassandra.thrift.TokenRange; -import org.apache.cassandra.thrift.TBinaryProtocol; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.thrift.TException; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily. @@ -218,7 +221,7 @@ public class ColumnFamilyInputFormat ext { try { - Cassandra.Client client = createConnection(host, ConfigHelper.getRpcPort(conf), true); + Cassandra.Client client = ConfigHelper.createConnection(host, ConfigHelper.getRpcPort(conf), true); client.set_keyspace(keyspace); return client.describe_splits(cfName, range.start_token, range.end_token, splitsize); } @@ -238,47 +241,10 @@ public class ColumnFamilyInputFormat ext throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ",")); } - private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException - { - TSocket socket = new TSocket(host, port); - TTransport trans = framed ? new TFramedTransport(socket) : socket; - try - { - trans.open(); - } - catch (TTransportException e) - { - throw new IOException("unable to connect to server", e); - } - return new Cassandra.Client(new TBinaryProtocol(trans)); - } private List<TokenRange> getRangeMap(Configuration conf) throws IOException { - String[] addresses = ConfigHelper.getInitialAddress(conf).split(","); - Cassandra.Client client = null; - List<IOException> exceptions = new ArrayList<IOException>(); - for (String address : addresses) - { - try - { - client = createConnection(address, ConfigHelper.getRpcPort(conf), true); - break; - } - catch (IOException ioe) - { - exceptions.add(ioe); - } - } - if (client == null) - { - logger.error("failed to connect to any initial addresses"); - for (IOException ioe : exceptions) - { - logger.error("", ioe); - } - throw exceptions.get(exceptions.size() - 1); - } + Cassandra.Client client = ConfigHelper.getClientFromAddressList(conf); List<TokenRange> map; try @@ -296,6 +262,8 @@ public class ColumnFamilyInputFormat ext return map; } + + public RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new ColumnFamilyRecordReader(); Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1183506&r1=1183505&r2=1183506&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Fri Oct 14 21:22:14 2011 @@ -95,10 +95,7 @@ implements org.apache.hadoop.mapred.Reco ColumnFamilyRecordWriter(Configuration conf) throws IOException { this.conf = conf; - this.ringCache = new RingCache(ConfigHelper.getOutputKeyspace(conf), - ConfigHelper.getPartitioner(conf), - ConfigHelper.getInitialAddress(conf), - ConfigHelper.getRpcPort(conf)); + this.ringCache = new RingCache(conf); this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * Runtime.getRuntime().availableProcessors()); this.clients = new HashMap<Range,RangeClient>(); batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32); Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1183506&r1=1183505&r2=1183506&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Fri Oct 14 21:22:14 2011 @@ -19,9 +19,13 @@ package org.apache.cassandra.hadoop; * under the License. * */ +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.KeyRange; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.TBinaryProtocol; @@ -30,6 +34,13 @@ import org.apache.hadoop.conf.Configurat import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class ConfigHelper { @@ -53,6 +64,9 @@ public class ConfigHelper private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address"; private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read"; private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write"; + + private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class); + /** * Set the keyspace and column family for the input of this job. @@ -330,4 +344,50 @@ public class ConfigHelper throw new RuntimeException(e); } } + + + public static Cassandra.Client getClientFromAddressList(Configuration conf) throws IOException + { + String[] addresses = ConfigHelper.getInitialAddress(conf).split(","); + Cassandra.Client client = null; + List<IOException> exceptions = new ArrayList<IOException>(); + for (String address : addresses) + { + try + { + client = createConnection(address, ConfigHelper.getRpcPort(conf), true); + break; + } + catch (IOException ioe) + { + exceptions.add(ioe); + } + } + if (client == null) + { + logger.error("failed to connect to any initial addresses"); + for (IOException ioe : exceptions) + { + logger.error("", ioe); + } + throw exceptions.get(exceptions.size() - 1); + } + return client; + } + + public static Cassandra.Client createConnection(String host, Integer port, boolean framed) + throws IOException + { + TSocket socket = new TSocket(host, port); + TTransport trans = framed ? new TFramedTransport(socket) : socket; + try + { + trans.open(); + } + catch (TTransportException e) + { + throw new IOException("unable to connect to server", e); + } + return new Cassandra.Client(new TBinaryProtocol(trans)); + } } Modified: cassandra/branches/cassandra-0.8/test/distributed/org/apache/cassandra/TestBase.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/distributed/org/apache/cassandra/TestBase.java?rev=1183506&r1=1183505&r2=1183506&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/test/distributed/org/apache/cassandra/TestBase.java (original) +++ cassandra/branches/cassandra-0.8/test/distributed/org/apache/cassandra/TestBase.java Fri Oct 14 21:22:14 2011 @@ -301,6 +301,8 @@ public abstract class TestBase protected List<InetAddress> endpointsForKey(InetAddress seed, ByteBuffer key, String keyspace) throws IOException { + Configuration conf = new Configuration(); + RingCache ring = new RingCache(keyspace, new RandomPartitioner(), seed.getHostAddress(), 9160); List<InetAddress> privateendpoints = ring.getEndpoint(key); List<InetAddress> endpoints = new ArrayList<InetAddress>(); Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/client/TestRingCache.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/client/TestRingCache.java?rev=1183506&r1=1183505&r2=1183506&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/client/TestRingCache.java (original) +++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/client/TestRingCache.java Fri Oct 14 21:22:14 2011 @@ -23,16 +23,18 @@ import java.nio.ByteBuffer; import java.util.Collection; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.Column; import org.apache.cassandra.thrift.ColumnParent; import org.apache.cassandra.thrift.ColumnPath; import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; -import org.apache.cassandra.utils.ByteBufferUtil; /** @@ -42,11 +44,12 @@ public class TestRingCache { private RingCache ringCache; private Cassandra.Client thriftClient; + private Configuration conf; public TestRingCache(String keyspace) throws IOException { - String seed = DatabaseDescriptor.getSeeds().iterator().next().getHostAddress(); - ringCache = new RingCache(keyspace, DatabaseDescriptor.getPartitioner(), seed, DatabaseDescriptor.getRpcPort()); + ConfigHelper.setOutputColumnFamily(conf, keyspace, "Standard1"); + ringCache = new RingCache(conf); } private void setup(String server, int port) throws Exception @@ -58,6 +61,12 @@ public class TestRingCache Cassandra.Client cassandraClient = new Cassandra.Client(binaryProtocol); socket.open(); thriftClient = cassandraClient; + String seed = DatabaseDescriptor.getSeeds().iterator().next().getHostAddress(); + conf = new Configuration(); + ConfigHelper.setPartitioner(conf, DatabaseDescriptor.getPartitioner().getClass().getName()); + ConfigHelper.setInitialAddress(conf, seed); + ConfigHelper.setRpcPort(conf, Integer.toString(DatabaseDescriptor.getRpcPort())); + } /**