Author: brandonwilliams Date: Fri Oct 14 21:57:29 2011 New Revision: 1183518
URL: http://svn.apache.org/viewvc?rev=1183518&view=rev Log: Add 0.8+ types and key validation type to pig schema. Patch by Steeve Morin, reviewed by brandonwilliams for CASSANDRA-3280 Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1183518&r1=1183517&r2=1183518&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Fri Oct 14 21:57:29 2011 @@ -15,6 +15,8 @@ * (Hadoop) accept comma delimited lists of initial thrift connections (CASSANDRA-3185) * ColumnFamily min_compaction_threshold should be >= 2 (CASSANDRA-3342) + * (Pig) add 0.8+ types and key validation type in schema (CASSANDRA-3280) + 0.8.7 * Kill server on wrapped OOME such as from FileChannel.map (CASSANDRA-3201) Modified: cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1183518&r1=1183517&r2=1183518&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original) +++ cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Fri Oct 14 21:57:29 2011 @@ -107,7 +107,7 @@ public class CassandraStorage extends Lo return limit; } - @Override + @Override public Tuple getNext() throws IOException { try @@ -122,7 +122,7 @@ public class CassandraStorage extends Lo assert key != null && cf != null; // and wrap it in a tuple - Tuple tuple = TupleFactory.getInstance().newTuple(2); + Tuple tuple = TupleFactory.getInstance().newTuple(2); ArrayList<Tuple> columns = new ArrayList<Tuple>(); tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet()) @@ -187,10 +187,12 @@ public class CassandraStorage extends Lo ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>(); AbstractType comparator = null; AbstractType default_validator = null; + AbstractType key_validator = null; try { - comparator = TypeParser.parse(cfDef.comparator_type); - default_validator = TypeParser.parse(cfDef.default_validation_class); + comparator = TypeParser.parse(cfDef.getComparator_type()); + default_validator = TypeParser.parse(cfDef.getDefault_validation_class()); + key_validator = TypeParser.parse(cfDef.getKey_validation_class()); } catch (ConfigurationException e) { @@ -199,13 +201,14 @@ public class CassandraStorage extends Lo marshallers.add(comparator); marshallers.add(default_validator); + marshallers.add(key_validator); return marshallers; } - private Map<ByteBuffer,AbstractType> getValidatorMap(CfDef cfDef) throws IOException + private Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException { Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>(); - for (ColumnDef cd : cfDef.column_metadata) + for (ColumnDef cd : cfDef.getColumn_metadata()) { if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty()) { @@ -236,6 +239,18 @@ public class CassandraStorage extends Lo this.reader = reader; } + public static Map<String, String> getQueryMap(String query) + { + String[] params = query.split("&"); + Map<String, String> map = new HashMap<String, String>(); + for (String param : params) + { + String[] keyValue = param.split("="); + map.put(keyValue[0], keyValue[1]); + } + return map; + } + private void setLocationFromUri(String location) throws IOException { // parse uri into keyspace and columnfamily @@ -247,18 +262,18 @@ public class CassandraStorage extends Lo String[] urlParts = location.split("\\?"); if (urlParts.length > 1) { - for (String param : urlParts[1].split("&")) - { - String[] pair = param.split("="); - if (pair[0].equals("slice_start")) - slice_start = ByteBufferUtil.bytes(pair[1]); - else if (pair[0].equals("slice_end")) - slice_end = ByteBufferUtil.bytes(pair[1]); - else if (pair[0].equals("reversed")) - slice_reverse = Boolean.parseBoolean(pair[1]); - else if (pair[0].equals("limit")) - limit = Integer.parseInt(pair[1]); - } + Map<String, String> urlQuery = getQueryMap(urlParts[1]); + AbstractType comparator = BytesType.instance; + if (urlQuery.containsKey("comparator")) + comparator = TypeParser.parse(urlQuery.get("comparator")); + if (urlQuery.containsKey("slice_start")) + slice_start = comparator.fromString(urlQuery.get("slice_start")); + if (urlQuery.containsKey("slice_end")) + slice_end = comparator.fromString(urlQuery.get("slice_end")); + if (urlQuery.containsKey("reversed")) + slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed")); + if (urlQuery.containsKey("limit")) + limit = Integer.parseInt(urlQuery.get("limit")); } String[] parts = urlParts[0].split("/+"); keyspace = parts[1]; @@ -312,10 +327,14 @@ public class CassandraStorage extends Lo // top-level schema, no type ResourceSchema schema = new ResourceSchema(); + // get default marshallers and validators + List<AbstractType> marshallers = getDefaultMarshallers(cfDef); + Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); + // add key ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema(); keyFieldSchema.setName("key"); - keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type + keyFieldSchema.setType(getPigType(marshallers.get(2))); // will become the bag of tuples ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema(); @@ -323,9 +342,6 @@ public class CassandraStorage extends Lo bagFieldSchema.setType(DataType.BAG); ResourceSchema bagSchema = new ResourceSchema(); - - List<AbstractType> marshallers = getDefaultMarshallers(cfDef); - Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); List<ResourceFieldSchema> tupleFields = new ArrayList<ResourceFieldSchema>(); // default comparator/validator @@ -381,6 +397,10 @@ public class CassandraStorage extends Lo return DataType.CHARARRAY; else if (type instanceof UTF8Type) return DataType.CHARARRAY; + else if (type instanceof FloatType) + return DataType.FLOAT; + else if (type instanceof DoubleType) + return DataType.DOUBLE; return DataType.BYTEARRAY; }