Author: jbellis Date: Wed Oct 19 04:22:58 2011 New Revision: 1185965 URL: http://svn.apache.org/viewvc?rev=1185965&view=rev Log: merge from 0.8
Modified: cassandra/branches/cassandra-1.0/ (props changed) cassandra/branches/cassandra-1.0/CHANGES.txt cassandra/branches/cassandra-1.0/contrib/ (props changed) cassandra/branches/cassandra-1.0/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed) cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed) cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed) cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed) cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed) cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/Cli.g cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/CliClient.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/client/RingCache.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cql/Cql.g cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ConfigHelper.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/ThriftValidation.java cassandra/branches/cassandra-1.0/test/distributed/org/apache/cassandra/TestBase.java cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/cli/CliTest.java cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/client/TestRingCache.java Propchange: cassandra/branches/cassandra-1.0/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Oct 19 04:22:58 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291 /cassandra/branches/cassandra-0.7:1026516-1183000 /cassandra/branches/cassandra-0.7.0:1053690-1055654 -/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1183002,1183241,1185761,1185960-1185961,1185963 +/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1185761,1185960-1185961,1185963 /cassandra/branches/cassandra-0.8.0:1125021-1130369 /cassandra/branches/cassandra-0.8.1:1101014-1125018 /cassandra/branches/cassandra-1.0:1167106,1167185 Modified: cassandra/branches/cassandra-1.0/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1185965&r1=1185964&r2=1185965&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/CHANGES.txt (original) +++ cassandra/branches/cassandra-1.0/CHANGES.txt Wed Oct 19 04:22:58 2011 @@ -18,6 +18,17 @@ * Fix CLI `show schema` to include "compression_options" (CASSANDRA-3368) * Snapshot to include manifest under LeveledCompactionStrategy (CASSANDRA-3359) * (CQL) SELECT query should allow CF name to be qualified by keyspace (CASSANDRA-3130) + * Display CLI version string on startup (CASSANDRA-3196) + * (Hadoop) make CFIF try rpc_address or fallback to listen_address + (CASSANDRA-3214) + * (Hadoop) accept comma delimited lists of initial thrift connections + (CASSANDRA-3185) + * ColumnFamily min_compaction_threshold should be >= 2 (CASSANDRA-3342) + * (Pig) add 0.8+ types and key validation type in schema (CASSANDRA-3280) + * Fix completely removing column metadata using CLI (CASSANDRA-3126) + * (CQL) Fix internal application error specifying 'using consistency ...' + in lower case (CASSANDRA-3366) + 1.0.0-final * close scrubbed sstable fd before deleting it (CASSANDRA-3318) Propchange: cassandra/branches/cassandra-1.0/contrib/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Oct 19 04:22:58 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009 /cassandra/branches/cassandra-0.7/contrib:1026516-1183000 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654 -/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1183002,1183241,1185761,1185960-1185961,1185963 +/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1185761,1185960-1185961,1185963 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018 /cassandra/branches/cassandra-1.0/contrib:1167106,1167185 Modified: cassandra/branches/cassandra-1.0/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1185965&r1=1185964&r2=1185965&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original) +++ cassandra/branches/cassandra-1.0/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Wed Oct 19 04:22:58 2011 @@ -107,7 +107,7 @@ public class CassandraStorage extends Lo return limit; } - @Override + @Override public Tuple getNext() throws IOException { try @@ -122,7 +122,7 @@ public class CassandraStorage extends Lo assert key != null && cf != null; // and wrap it in a tuple - Tuple tuple = TupleFactory.getInstance().newTuple(2); + Tuple tuple = TupleFactory.getInstance().newTuple(2); ArrayList<Tuple> columns = new ArrayList<Tuple>(); tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet()) @@ -187,10 +187,12 @@ public class CassandraStorage extends Lo ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>(); AbstractType comparator = null; AbstractType default_validator = null; + AbstractType key_validator = null; try { - comparator = TypeParser.parse(cfDef.comparator_type); - default_validator = TypeParser.parse(cfDef.default_validation_class); + comparator = TypeParser.parse(cfDef.getComparator_type()); + default_validator = TypeParser.parse(cfDef.getDefault_validation_class()); + key_validator = TypeParser.parse(cfDef.getKey_validation_class()); } catch (ConfigurationException e) { @@ -199,13 +201,14 @@ public class CassandraStorage extends Lo marshallers.add(comparator); marshallers.add(default_validator); + marshallers.add(key_validator); return marshallers; } - private Map<ByteBuffer,AbstractType> getValidatorMap(CfDef cfDef) throws IOException + private Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException { Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>(); - for (ColumnDef cd : cfDef.column_metadata) + for (ColumnDef cd : cfDef.getColumn_metadata()) { if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty()) { @@ -236,6 +239,18 @@ public class CassandraStorage extends Lo this.reader = reader; } + public static Map<String, String> getQueryMap(String query) + { + String[] params = query.split("&"); + Map<String, String> map = new HashMap<String, String>(); + for (String param : params) + { + String[] keyValue = param.split("="); + map.put(keyValue[0], keyValue[1]); + } + return map; + } + private void setLocationFromUri(String location) throws IOException { // parse uri into keyspace and columnfamily @@ -247,18 +262,18 @@ public class CassandraStorage extends Lo String[] urlParts = location.split("\\?"); if (urlParts.length > 1) { - for (String param : urlParts[1].split("&")) - { - String[] pair = param.split("="); - if (pair[0].equals("slice_start")) - slice_start = ByteBufferUtil.bytes(pair[1]); - else if (pair[0].equals("slice_end")) - slice_end = ByteBufferUtil.bytes(pair[1]); - else if (pair[0].equals("reversed")) - slice_reverse = Boolean.parseBoolean(pair[1]); - else if (pair[0].equals("limit")) - limit = Integer.parseInt(pair[1]); - } + Map<String, String> urlQuery = getQueryMap(urlParts[1]); + AbstractType comparator = BytesType.instance; + if (urlQuery.containsKey("comparator")) + comparator = TypeParser.parse(urlQuery.get("comparator")); + if (urlQuery.containsKey("slice_start")) + slice_start = comparator.fromString(urlQuery.get("slice_start")); + if (urlQuery.containsKey("slice_end")) + slice_end = comparator.fromString(urlQuery.get("slice_end")); + if (urlQuery.containsKey("reversed")) + slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed")); + if (urlQuery.containsKey("limit")) + limit = Integer.parseInt(urlQuery.get("limit")); } String[] parts = urlParts[0].split("/+"); keyspace = parts[1]; @@ -312,10 +327,14 @@ public class CassandraStorage extends Lo // top-level schema, no type ResourceSchema schema = new ResourceSchema(); + // get default marshallers and validators + List<AbstractType> marshallers = getDefaultMarshallers(cfDef); + Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); + // add key ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema(); keyFieldSchema.setName("key"); - keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type + keyFieldSchema.setType(getPigType(marshallers.get(2))); // will become the bag of tuples ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema(); @@ -323,9 +342,6 @@ public class CassandraStorage extends Lo bagFieldSchema.setType(DataType.BAG); ResourceSchema bagSchema = new ResourceSchema(); - - List<AbstractType> marshallers = getDefaultMarshallers(cfDef); - Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); List<ResourceFieldSchema> tupleFields = new ArrayList<ResourceFieldSchema>(); // default comparator/validator @@ -381,6 +397,10 @@ public class CassandraStorage extends Lo return DataType.CHARARRAY; else if (type instanceof UTF8Type) return DataType.CHARARRAY; + else if (type instanceof FloatType) + return DataType.FLOAT; + else if (type instanceof DoubleType) + return DataType.DOUBLE; return DataType.BYTEARRAY; } @@ -545,7 +565,7 @@ public class CassandraStorage extends Lo Cassandra.Client client = null; try { - client = createConnection(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf), true); + client = ConfigHelper.getClientFromAddressList(conf); CfDef cfDef = null; client.set_keyspace(keyspace); KsDef ksDef = client.describe_keyspace(keyspace); @@ -579,21 +599,6 @@ public class CassandraStorage extends Lo } } - private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException - { - TSocket socket = new TSocket(host, port); - TTransport trans = framed ? new TFramedTransport(socket) : socket; - try - { - trans.open(); - } - catch (TTransportException e) - { - throw new IOException("unable to connect to server", e); - } - return new Cassandra.Client(new TBinaryProtocol(trans)); - } - private static String cfdefToString(CfDef cfDef) { assert cfDef != null; Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Oct 19 04:22:58 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1183000 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654 -/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1183002,1183241,1185761,1185960-1185961,1185963 +/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1185761,1185960-1185961,1185963 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167106,1167185 Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Oct 19 04:22:58 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1183000 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654 -/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1183002,1183241,1185761,1185960-1185961,1185963 +/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1185761,1185960-1185961,1185963 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167106,1167185 Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Oct 19 04:22:58 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1183000 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654 -/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1183002,1183241,1185761,1185960-1185961,1185963 +/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1185761,1185960-1185961,1185963 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167106,1167185 Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Oct 19 04:22:58 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1183000 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654 -/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1183002,1183241,1185761,1185960-1185961,1185963 +/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1185761,1185960-1185961,1185963 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167106,1167185 Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Oct 19 04:22:58 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1183000 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654 -/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1183002,1183241,1185761,1185960-1185961,1185963 +/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1185761,1185960-1185961,1185963 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167106,1167185 Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/Cli.g URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/Cli.g?rev=1185965&r1=1185964&r2=1185965&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/Cli.g (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/Cli.g Wed Oct 19 04:22:58 2011 @@ -401,8 +401,8 @@ attrValue arrayConstruct - : '[' (hashConstruct ','?)+ ']' - -> ^(ARRAY (hashConstruct)+) + : '[' (hashConstruct ','?)* ']' + -> ^(ARRAY (hashConstruct)*) ; hashConstruct Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/CliClient.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/CliClient.java?rev=1185965&r1=1185964&r2=1185965&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/CliClient.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/CliClient.java Wed Oct 19 04:22:58 2011 @@ -189,6 +189,7 @@ public class CliClient public void printBanner() { sessionState.out.println(getHelp().banner); + sessionState.out.println("Cassandra CLI version " + FBUtilities.getReleaseVersionString()); } // Execute a CLI Statement Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/client/RingCache.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/client/RingCache.java?rev=1185965&r1=1185964&r2=1185965&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/client/RingCache.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/client/RingCache.java Wed Oct 19 04:22:58 2011 @@ -21,25 +21,22 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.HashSet; import java.util.List; -import java.util.Set; - -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Multimap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.TokenRange; +import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TException; -import org.apache.cassandra.thrift.TBinaryProtocol; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; /** * A class for caching the ring map at the client. For usage example, see @@ -50,42 +47,32 @@ public class RingCache { final private static Logger logger_ = LoggerFactory.getLogger(RingCache.class); - private final Set<String> seeds_ = new HashSet<String>(); - private final int port_; - private final IPartitioner<?> partitioner_; - private final String keyspace; + private final IPartitioner<?> partitioner; + private final Configuration conf; private Multimap<Range, InetAddress> rangeMap; - public RingCache(String keyspace, IPartitioner<?> partitioner, String addresses, int port) throws IOException + public RingCache(Configuration conf) throws IOException { - for (String seed : addresses.split(",")) - seeds_.add(seed); - this.port_ = port; - this.keyspace = keyspace; - this.partitioner_ = partitioner; + this.conf = conf; + this.partitioner = ConfigHelper.getPartitioner(conf); refreshEndpointMap(); } public void refreshEndpointMap() { - for (String seed : seeds_) - { - try - { - TSocket socket = new TSocket(seed, port_); - TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket)); - Cassandra.Client client = new Cassandra.Client(binaryProtocol); - socket.open(); + try { + + Cassandra.Client client = ConfigHelper.getClientFromAddressList(conf); - List<TokenRange> ring = client.describe_ring(keyspace); + List<TokenRange> ring = client.describe_ring(ConfigHelper.getOutputKeyspace(conf)); rangeMap = ArrayListMultimap.create(); for (TokenRange range : ring) { - Token<?> left = partitioner_.getTokenFactory().fromString(range.start_token); - Token<?> right = partitioner_.getTokenFactory().fromString(range.end_token); - Range r = new Range(left, right, partitioner_); + Token<?> left = partitioner.getTokenFactory().fromString(range.start_token); + Token<?> right = partitioner.getTokenFactory().fromString(range.end_token); + Range r = new Range(left, right, partitioner); for (String host : range.endpoints) { try @@ -98,19 +85,20 @@ public class RingCache } } } - break; } catch (InvalidRequestException e) { throw new RuntimeException(e); } + catch (IOException e) + { + throw new RuntimeException(e); + } catch (TException e) { - /* let the Exception go and try another seed. log this though */ - logger_.debug("Error contacting seed " + seed + " " + e.getMessage()); + logger_.debug("Error contacting seed list" + ConfigHelper.getInitialAddress(conf) + " " + e.getMessage()); } } - } /** ListMultimap promises to return a List for get(K) */ public List<InetAddress> getEndpoint(Range range) @@ -126,7 +114,7 @@ public class RingCache public Range getRange(ByteBuffer key) { // TODO: naive linear search of the token map - Token<?> t = partitioner_.getToken(key); + Token<?> t = partitioner.getToken(key); for (Range range : rangeMap.keySet()) if (range.contains(t)) return range; Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cql/Cql.g URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cql/Cql.g?rev=1185965&r1=1185964&r2=1185965&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cql/Cql.g (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cql/Cql.g Wed Oct 19 04:22:58 2011 @@ -148,7 +148,7 @@ selectStatement returns [SelectStatement | K_COUNT '(' s2=selectExpression ')' { expression = s2; isCountOp = true; } ) K_FROM (keyspace=(IDENT | STRING_LITERAL | INTEGER) '.')? columnFamily=( IDENT | STRING_LITERAL | INTEGER ) - ( K_USING K_CONSISTENCY K_LEVEL { cLevel = ConsistencyLevel.valueOf($K_LEVEL.text); } )? + ( K_USING K_CONSISTENCY K_LEVEL { cLevel = ConsistencyLevel.valueOf($K_LEVEL.text.toUpperCase()); } )? ( K_WHERE whereClause )? ( K_LIMIT rows=INTEGER { numRecords = Integer.parseInt($rows.text); } )? endStmnt @@ -231,7 +231,7 @@ usingClauseDelete[Attributes attrs] ; usingClauseDeleteObjective[Attributes attrs] - : K_CONSISTENCY K_LEVEL { attrs.setConsistencyLevel(ConsistencyLevel.valueOf($K_LEVEL.text)); } + : K_CONSISTENCY K_LEVEL { attrs.setConsistencyLevel(ConsistencyLevel.valueOf($K_LEVEL.text.toUpperCase())); } | K_TIMESTAMP ts=INTEGER { attrs.setTimestamp(Long.valueOf($ts.text)); } ; Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1185965&r1=1185964&r2=1185965&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Wed Oct 19 04:22:58 2011 @@ -24,16 +24,17 @@ package org.apache.cassandra.hadoop; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.SortedMap; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.db.IColumn; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; @@ -41,14 +42,16 @@ import org.apache.cassandra.thrift.Cassa import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.KeyRange; import org.apache.cassandra.thrift.TokenRange; -import org.apache.cassandra.thrift.TBinaryProtocol; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.thrift.TException; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily. @@ -188,13 +191,17 @@ public class ColumnFamilyInputFormat ext { ArrayList<InputSplit> splits = new ArrayList<InputSplit>(); List<String> tokens = getSubSplits(keyspace, cfName, range, conf); - + assert range.rpc_endpoints.size() == range.endpoints.size() : "rpc_endpoints size must match endpoints size"; // turn the sub-ranges into InputSplits String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]); // hadoop needs hostname, not ip - for (int i = 0; i < endpoints.length; i++) + int endpointIndex = 0; + for (String endpoint: range.rpc_endpoints) { - endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName(); + String endpoint_address = endpoint; + if(endpoint_address == null || endpoint_address == "0.0.0.0") + endpoint_address = range.endpoints.get(endpointIndex); + endpoints[endpointIndex++] = InetAddress.getByName(endpoint_address).getHostName(); } for (int i = 1; i < tokens.size(); i++) @@ -210,11 +217,11 @@ public class ColumnFamilyInputFormat ext private List<String> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException { int splitsize = ConfigHelper.getInputSplitSize(conf); - for (String host : range.endpoints) + for (String host : range.rpc_endpoints) { try { - Cassandra.Client client = createConnection(host, ConfigHelper.getRpcPort(conf), true); + Cassandra.Client client = ConfigHelper.createConnection(host, ConfigHelper.getRpcPort(conf), true); client.set_keyspace(keyspace); return client.describe_splits(cfName, range.start_token, range.end_token, splitsize); } @@ -234,47 +241,10 @@ public class ColumnFamilyInputFormat ext throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ",")); } - private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException - { - TSocket socket = new TSocket(host, port); - TTransport trans = framed ? new TFramedTransport(socket) : socket; - try - { - trans.open(); - } - catch (TTransportException e) - { - throw new IOException("unable to connect to server", e); - } - return new Cassandra.Client(new TBinaryProtocol(trans)); - } private List<TokenRange> getRangeMap(Configuration conf) throws IOException { - String[] addresses = ConfigHelper.getInitialAddress(conf).split(","); - Cassandra.Client client = null; - List<IOException> exceptions = new ArrayList<IOException>(); - for (String address : addresses) - { - try - { - client = createConnection(address, ConfigHelper.getRpcPort(conf), true); - break; - } - catch (IOException ioe) - { - exceptions.add(ioe); - } - } - if (client == null) - { - logger.error("failed to connect to any initial addresses"); - for (IOException ioe : exceptions) - { - logger.error("", ioe); - } - throw exceptions.get(exceptions.size() - 1); - } + Cassandra.Client client = ConfigHelper.getClientFromAddressList(conf); List<TokenRange> map; try @@ -292,6 +262,8 @@ public class ColumnFamilyInputFormat ext return map; } + + public RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new ColumnFamilyRecordReader(); Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1185965&r1=1185964&r2=1185965&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Wed Oct 19 04:22:58 2011 @@ -95,10 +95,7 @@ implements org.apache.hadoop.mapred.Reco ColumnFamilyRecordWriter(Configuration conf) throws IOException { this.conf = conf; - this.ringCache = new RingCache(ConfigHelper.getOutputKeyspace(conf), - ConfigHelper.getPartitioner(conf), - ConfigHelper.getInitialAddress(conf), - ConfigHelper.getRpcPort(conf)); + this.ringCache = new RingCache(conf); this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * Runtime.getRuntime().availableProcessors()); this.clients = new HashMap<Range,RangeClient>(); batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32); Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ConfigHelper.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1185965&r1=1185964&r2=1185965&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Wed Oct 19 04:22:58 2011 @@ -19,9 +19,13 @@ package org.apache.cassandra.hadoop; * under the License. * */ +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.KeyRange; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.TBinaryProtocol; @@ -31,6 +35,13 @@ import org.apache.hadoop.conf.Configurat import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class ConfigHelper { @@ -54,6 +65,9 @@ public class ConfigHelper private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address"; private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read"; private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write"; + + private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class); + /** * Set the keyspace and column family for the input of this job. @@ -331,4 +345,50 @@ public class ConfigHelper throw new RuntimeException(e); } } + + + public static Cassandra.Client getClientFromAddressList(Configuration conf) throws IOException + { + String[] addresses = ConfigHelper.getInitialAddress(conf).split(","); + Cassandra.Client client = null; + List<IOException> exceptions = new ArrayList<IOException>(); + for (String address : addresses) + { + try + { + client = createConnection(address, ConfigHelper.getRpcPort(conf), true); + break; + } + catch (IOException ioe) + { + exceptions.add(ioe); + } + } + if (client == null) + { + logger.error("failed to connect to any initial addresses"); + for (IOException ioe : exceptions) + { + logger.error("", ioe); + } + throw exceptions.get(exceptions.size() - 1); + } + return client; + } + + public static Cassandra.Client createConnection(String host, Integer port, boolean framed) + throws IOException + { + TSocket socket = new TSocket(host, port); + TTransport trans = framed ? new TFramedTransport(socket) : socket; + try + { + trans.open(); + } + catch (TTransportException e) + { + throw new IOException("unable to connect to server", e); + } + return new Cassandra.Client(new TBinaryProtocol(trans)); + } } Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/ThriftValidation.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=1185965&r1=1185964&r2=1185965&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/ThriftValidation.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/ThriftValidation.java Wed Oct 19 04:22:58 2011 @@ -691,19 +691,11 @@ public class ThriftValidation { if (cf_def.isSetMin_compaction_threshold() && cf_def.isSetMax_compaction_threshold()) { - if ((cf_def.min_compaction_threshold > cf_def.max_compaction_threshold) - && cf_def.max_compaction_threshold != 0) - { - throw new ConfigurationException("min_compaction_threshold cannot be greater than max_compaction_threshold"); - } + validateMinCompactionThreshold(cf_def.min_compaction_threshold, cf_def.max_compaction_threshold); } else if (cf_def.isSetMin_compaction_threshold()) { - if (cf_def.min_compaction_threshold > CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD) - { - throw new ConfigurationException(String.format("min_compaction_threshold cannot be greather than max_compaction_threshold (default %d)", - CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD)); - } + validateMinCompactionThreshold(cf_def.min_compaction_threshold, CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD); } else if (cf_def.isSetMax_compaction_threshold()) { @@ -718,6 +710,26 @@ public class ThriftValidation } } + public static void validateMinCompactionThreshold(int min_compaction_threshold, int max_compaction_threshold) throws ConfigurationException + { + if (min_compaction_threshold <= 1) + throw new ConfigurationException("min_compaction_threshold cannot be less than 2."); + + if (min_compaction_threshold > max_compaction_threshold && max_compaction_threshold != 0) + throw new ConfigurationException(String.format("min_compaction_threshold cannot be greater than max_compaction_threshold %d", + max_compaction_threshold)); + } + + public static void validateMemtableSettings(org.apache.cassandra.thrift.CfDef cf_def) throws ConfigurationException + { + if (cf_def.isSetMemtable_flush_after_mins()) + DatabaseDescriptor.validateMemtableFlushPeriod(cf_def.memtable_flush_after_mins); + if (cf_def.isSetMemtable_throughput_in_mb()) + DatabaseDescriptor.validateMemtableThroughput(cf_def.memtable_throughput_in_mb); + if (cf_def.isSetMemtable_operations_in_millions()) + DatabaseDescriptor.validateMemtableOperations(cf_def.memtable_operations_in_millions); + } + public static void validateKeyspaceNotYetExisting(String newKsName) throws InvalidRequestException { // keyspace names must be unique case-insensitively because the keyspace name becomes the directory Modified: cassandra/branches/cassandra-1.0/test/distributed/org/apache/cassandra/TestBase.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/test/distributed/org/apache/cassandra/TestBase.java?rev=1185965&r1=1185964&r2=1185965&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/test/distributed/org/apache/cassandra/TestBase.java (original) +++ cassandra/branches/cassandra-1.0/test/distributed/org/apache/cassandra/TestBase.java Wed Oct 19 04:22:58 2011 @@ -301,6 +301,8 @@ public abstract class TestBase protected List<InetAddress> endpointsForKey(InetAddress seed, ByteBuffer key, String keyspace) throws IOException { + Configuration conf = new Configuration(); + RingCache ring = new RingCache(keyspace, new RandomPartitioner(), seed.getHostAddress(), 9160); List<InetAddress> privateendpoints = ring.getEndpoint(key); List<InetAddress> endpoints = new ArrayList<InetAddress>(); Modified: cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/cli/CliTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/cli/CliTest.java?rev=1185965&r1=1185964&r2=1185965&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/cli/CliTest.java (original) +++ cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/cli/CliTest.java Wed Oct 19 04:22:58 2011 @@ -158,6 +158,7 @@ public class CliTest extends CleanupHelp "drop column family cF8;", "create keyspace TESTIN;", "drop keyspace tesTIN;", + "update column family 123 with comparator=UTF8Type and column_metadata=[];", "drop column family 123;", "create column family myCF with column_type='Super' and comparator='UTF8Type' AND subcomparator='UTF8Type';", "assume myCF keys as utf8;", Modified: cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/client/TestRingCache.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/client/TestRingCache.java?rev=1185965&r1=1185964&r2=1185965&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/client/TestRingCache.java (original) +++ cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/client/TestRingCache.java Wed Oct 19 04:22:58 2011 @@ -23,16 +23,18 @@ import java.nio.ByteBuffer; import java.util.Collection; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.Column; import org.apache.cassandra.thrift.ColumnParent; import org.apache.cassandra.thrift.ColumnPath; import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; -import org.apache.cassandra.utils.ByteBufferUtil; /** @@ -42,11 +44,12 @@ public class TestRingCache { private RingCache ringCache; private Cassandra.Client thriftClient; + private Configuration conf; public TestRingCache(String keyspace) throws IOException { - String seed = DatabaseDescriptor.getSeeds().iterator().next().getHostAddress(); - ringCache = new RingCache(keyspace, DatabaseDescriptor.getPartitioner(), seed, DatabaseDescriptor.getRpcPort()); + ConfigHelper.setOutputColumnFamily(conf, keyspace, "Standard1"); + ringCache = new RingCache(conf); } private void setup(String server, int port) throws Exception @@ -58,6 +61,12 @@ public class TestRingCache Cassandra.Client cassandraClient = new Cassandra.Client(binaryProtocol); socket.open(); thriftClient = cassandraClient; + String seed = DatabaseDescriptor.getSeeds().iterator().next().getHostAddress(); + conf = new Configuration(); + ConfigHelper.setPartitioner(conf, DatabaseDescriptor.getPartitioner().getClass().getName()); + ConfigHelper.setInitialAddress(conf, seed); + ConfigHelper.setRpcPort(conf, Integer.toString(DatabaseDescriptor.getRpcPort())); + } /**