Pig support for CQL collections. Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5867
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/70297f9a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/70297f9a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/70297f9a Branch: refs/heads/trunk Commit: 70297f9ad44d52cc9612cd91e7305969fc86e204 Parents: caef32e Author: Brandon Williams <brandonwilli...@apache.org> Authored: Tue Sep 10 13:28:23 2013 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Tue Sep 10 13:28:23 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../hadoop/pig/AbstractCassandraStorage.java | 78 +++++++++++++++----- .../apache/cassandra/hadoop/pig/CqlStorage.java | 78 +++++++++++++++++++- 3 files changed, 135 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2328bf7..4d5b446 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ * Fix streaming does not transfer wrapped range (CASSANDRA-5948) * Fix loading index summary containing empty key (CASSANDRA-5965) * Correctly handle limits in CompositesSearcher (CASSANDRA-5975) + * Pig: handle CQL collections (CASSANDRA-5867) 1.2.9 http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java index 59d7817..03805d2 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java @@ -330,7 +330,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store return DataType.FLOAT; else if (type instanceof DoubleType) return DataType.DOUBLE; - else if (type instanceof AbstractCompositeType ) + else if (type instanceof AbstractCompositeType || type instanceof CollectionType) return DataType.TUPLE; return DataType.BYTEARRAY; @@ -401,30 +401,72 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store return ByteBuffer.wrap(UUIDGen.decompose((UUID) o)); if(o instanceof Tuple) { List<Object> objects = ((Tuple)o).getAll(); - List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size()); - int totalLength = 0; - for(Object sub : objects) + //collections + if (objects.size() > 0 && objects.get(0) instanceof String) { - ByteBuffer buffer = objToBB(sub); - serialized.add(buffer); - totalLength += 2 + buffer.remaining() + 1; - } - ByteBuffer out = ByteBuffer.allocate(totalLength); - for (ByteBuffer bb : serialized) - { - int length = bb.remaining(); - out.put((byte) ((length >> 8) & 0xFF)); - out.put((byte) (length & 0xFF)); - out.put(bb); - out.put((byte) 0); + String collectionType = (String) objects.get(0); + if ("set".equalsIgnoreCase(collectionType) || + "list".equalsIgnoreCase(collectionType)) + return objToListOrSetBB(objects.subList(1, objects.size())); + else if ("map".equalsIgnoreCase(collectionType)) + return objToMapBB(objects.subList(1, objects.size())); + } - out.flip(); - return out; + return objToCompositeBB(objects); } return ByteBuffer.wrap(((DataByteArray) o).get()); } + private ByteBuffer objToListOrSetBB(List<Object> objects) + { + List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size()); + for(Object sub : objects) + { + ByteBuffer buffer = objToBB(sub); + serialized.add(buffer); + } + return CollectionType.pack(serialized, objects.size()); + } + + private ByteBuffer objToMapBB(List<Object> objects) + { + List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2); + for(Object sub : objects) + { + List<Object> keyValue = ((Tuple)sub).getAll(); + for (Object entry: keyValue) + { + ByteBuffer buffer = objToBB(entry); + serialized.add(buffer); + } + } + return CollectionType.pack(serialized, objects.size()); + } + + private ByteBuffer objToCompositeBB(List<Object> objects) + { + List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size()); + int totalLength = 0; + for(Object sub : objects) + { + ByteBuffer buffer = objToBB(sub); + serialized.add(buffer); + totalLength += 2 + buffer.remaining() + 1; + } + ByteBuffer out = ByteBuffer.allocate(totalLength); + for (ByteBuffer bb : serialized) + { + int length = bb.remaining(); + out.put((byte) ((length >> 8) & 0xFF)); + out.put((byte) (length & 0xFF)); + out.put(bb); + out.put((byte) 0); + } + out.flip(); + return out; + } + public void cleanupOnFailure(String failure, Job job) { } http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java index 7e22823..a73e5a5 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java @@ -35,6 +35,7 @@ import org.apache.pig.Expression; import org.apache.pig.Expression.OpType; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.*; import org.apache.pig.impl.util.UDFContext; @@ -107,10 +108,11 @@ public class CqlStorage extends AbstractCassandraStorage if (columnValue != null) { IColumn column = new Column(cdef.name, columnValue); - tuple.set(i, columnToTuple(column, cfDef, UTF8Type.instance)); + AbstractType<?> validator = getValidatorMap(cfDef).get(column.name()); + setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), validator); } else - tuple.set(i, TupleFactory.getInstance().newTuple()); + tuple.set(i, null); i++; } return tuple; @@ -121,6 +123,74 @@ public class CqlStorage extends AbstractCassandraStorage } } + /** set the value to the position of the tuple */ + private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException + { + if (validator instanceof CollectionType) + setCollectionTupleValues(tuple, position, value, validator); + else + setTupleValue(tuple, position, value); + } + + /** set the values of set/list at and after the position of the tuple */ + private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException + { + if (validator instanceof MapType) + { + setMapTupleValues(tuple, position, value, validator); + return; + } + AbstractType<?> elementValidator; + if (validator instanceof SetType) + elementValidator = ((SetType<?>) validator).elements; + else if (validator instanceof ListType) + elementValidator = ((ListType<?>) validator).elements; + else + return; + + int i = 0; + Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size()); + for (Object entry : (Collection<?>) value) + { + setTupleValue(innerTuple, i, entry, elementValidator); + i++; + } + tuple.set(position, innerTuple); + } + + /** set the values of set/list at and after the position of the tuple */ + private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException + { + AbstractType<?> keyValidator = ((MapType<?, ?>) validator).keys; + AbstractType<?> valueValidator = ((MapType<?, ?>) validator).values; + + int i = 0; + Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size()); + for(Map.Entry<?,?> entry : ((Map<Object, Object>)value).entrySet()) + { + Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2); + setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator); + setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator); + innerTuple.set(i, mapEntryTuple); + i++; + } + tuple.set(position, innerTuple); + } + + /** convert a cql column to an object */ + private Object cqlColumnToObj(IColumn col, CfDef cfDef) throws IOException + { + // standard + Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); + if (validators.get(col.name()) == null) + { + Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef); + return marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value()); + } + else + return validators.get(col.name()).compose(col.value()); + } + /** set read configuration settings */ public void setLocation(String location, Job job) throws IOException { @@ -410,7 +480,7 @@ public class CqlStorage extends AbstractCassandraStorage // output prepared statement if (urlQuery.containsKey("output_query")) - outputQuery = urlQuery.get("output_query").replaceAll("#", "?").replaceAll("@", "="); + outputQuery = urlQuery.get("output_query"); // user defined where clause if (urlQuery.containsKey("where_clause")) @@ -457,7 +527,7 @@ public class CqlStorage extends AbstractCassandraStorage String name = be.getLhs().toString(); String value = be.getRhs().toString(); OpType op = expression.getOpType(); - String opString = op.name(); + String opString = op.toString(); switch (op) { case OP_EQ: