Author: brandonwilliams Date: Wed Sep 28 22:01:01 2011 New Revision: 1177083
URL: http://svn.apache.org/viewvc?rev=1177083&view=rev Log: Pig storage handler implements LoadMetadata interface. Patch by brandonwilliams, reviewed by Jeremy Hanna for CASSANDRA-2777 Modified: cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Modified: cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1177083&r1=1177082&r2=1177083&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original) +++ cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Wed Sep 28 22:01:01 2011 @@ -31,7 +31,7 @@ import org.apache.commons.logging.LogFac import org.apache.cassandra.db.Column; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.SuperColumn; -import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.hadoop.*; import org.apache.cassandra.thrift.Mutation; import org.apache.cassandra.thrift.Deletion; @@ -46,6 +46,7 @@ import org.apache.pig.*; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.*; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.util.UDFContext; import org.apache.thrift.TDeserializer; @@ -61,7 +62,7 @@ import org.apache.thrift.transport.TTran * * A row from a standard CF will be returned as nested tuples: (key, ((name1, val1), (name2, val2))). */ -public class CassandraStorage extends LoadFunc implements StoreFuncInterface +public class CassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata { // system environment variables that can be set to configure connection info: // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper @@ -293,6 +294,103 @@ public class CassandraStorage extends Lo initSchema(loadSignature); } + public ResourceSchema getSchema(String location, Job job) throws IOException + { + setLocation(location, job); + CfDef cfDef = getCfDef(loadSignature); + + if (cfDef.column_type.equals("Super")) + return null; + // top-level schema, no type + ResourceSchema schema = new ResourceSchema(); + + // add key + ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema(); + keyFieldSchema.setName("key"); + keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type + + // will become the bag of tuples + ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema(); + bagFieldSchema.setName("columns"); + bagFieldSchema.setType(DataType.BAG); + ResourceSchema bagSchema = new ResourceSchema(); + + + List<AbstractType> marshallers = getDefaultMarshallers(cfDef); + Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); + List<ResourceFieldSchema> tupleFields = new ArrayList<ResourceFieldSchema>(); + + // default comparator/validator + ResourceSchema innerTupleSchema = new ResourceSchema(); + ResourceFieldSchema tupleField = new ResourceFieldSchema(); + tupleField.setType(DataType.TUPLE); + tupleField.setSchema(innerTupleSchema); + + ResourceFieldSchema colSchema = new ResourceFieldSchema(); + colSchema.setName("name"); + colSchema.setType(getPigType(marshallers.get(0))); + tupleFields.add(colSchema); + + ResourceFieldSchema valSchema = new ResourceFieldSchema(); + AbstractType validator = marshallers.get(1); + valSchema.setName("value"); + valSchema.setType(getPigType(validator)); + tupleFields.add(valSchema); + + // defined validators/indexes + for (ColumnDef cdef : cfDef.column_metadata) + { + colSchema = new ResourceFieldSchema(); + colSchema.setName(new String(cdef.getName())); + colSchema.setType(getPigType(marshallers.get(0))); + tupleFields.add(colSchema); + + valSchema = new ResourceFieldSchema(); + validator = validators.get(cdef.getName()); + if (validator == null) + validator = marshallers.get(1); + valSchema.setName("value"); + valSchema.setType(getPigType(validator)); + tupleFields.add(valSchema); + } + innerTupleSchema.setFields(tupleFields.toArray(new ResourceFieldSchema[tupleFields.size()])); + + // a bag can contain only one tuple, but that tuple can contain anything + bagSchema.setFields(new ResourceFieldSchema[] { tupleField }); + bagFieldSchema.setSchema(bagSchema); + // top level schema contains everything + schema.setFields(new ResourceFieldSchema[] { keyFieldSchema, bagFieldSchema }); + return schema; + } + + private byte getPigType(AbstractType type) + { + if (type instanceof LongType) + return DataType.LONG; + else if (type instanceof IntegerType) + return DataType.INTEGER; + else if (type instanceof AsciiType) + return DataType.CHARARRAY; + else if (type instanceof UTF8Type) + return DataType.CHARARRAY; + return DataType.BYTEARRAY; + } + + public ResourceStatistics getStatistics(String location, Job job) + { + return null; + } + + public String[] getPartitionKeys(String location, Job job) + { + return null; + } + + public void setPartitionFilter(Expression partitionFilter) + { + // no-op + } + @Override public String relativeToAbsolutePath(String location, Path curDir) throws IOException {