Pig support for CQL collections.
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5867


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/70297f9a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/70297f9a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/70297f9a

Branch: refs/heads/trunk
Commit: 70297f9ad44d52cc9612cd91e7305969fc86e204
Parents: caef32e
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Tue Sep 10 13:28:23 2013 -0500
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Tue Sep 10 13:28:23 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../hadoop/pig/AbstractCassandraStorage.java    | 78 +++++++++++++++-----
 .../apache/cassandra/hadoop/pig/CqlStorage.java | 78 +++++++++++++++++++-
 3 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2328bf7..4d5b446 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
  * Fix streaming does not transfer wrapped range (CASSANDRA-5948)
  * Fix loading index summary containing empty key (CASSANDRA-5965)
  * Correctly handle limits in CompositesSearcher (CASSANDRA-5975)
+ * Pig: handle CQL collections (CASSANDRA-5867)
 
 
 1.2.9

http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java 
b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 59d7817..03805d2 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -330,7 +330,7 @@ public abstract class AbstractCassandraStorage extends 
LoadFunc implements Store
             return DataType.FLOAT;
         else if (type instanceof DoubleType)
             return DataType.DOUBLE;
-        else if (type instanceof AbstractCompositeType )
+        else if (type instanceof AbstractCompositeType || type instanceof 
CollectionType)
             return DataType.TUPLE;
 
         return DataType.BYTEARRAY;
@@ -401,30 +401,72 @@ public abstract class AbstractCassandraStorage extends 
LoadFunc implements Store
             return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
         if(o instanceof Tuple) {
             List<Object> objects = ((Tuple)o).getAll();
-            List<ByteBuffer> serialized = new 
ArrayList<ByteBuffer>(objects.size());
-            int totalLength = 0;
-            for(Object sub : objects)
+            //collections
+            if (objects.size() > 0 && objects.get(0) instanceof String)
             {
-                ByteBuffer buffer = objToBB(sub);
-                serialized.add(buffer);
-                totalLength += 2 + buffer.remaining() + 1;
-            }
-            ByteBuffer out = ByteBuffer.allocate(totalLength);
-            for (ByteBuffer bb : serialized)
-            {
-                int length = bb.remaining();
-                out.put((byte) ((length >> 8) & 0xFF));
-                out.put((byte) (length & 0xFF));
-                out.put(bb);
-                out.put((byte) 0);
+                String collectionType = (String) objects.get(0);
+                if ("set".equalsIgnoreCase(collectionType) ||
+                        "list".equalsIgnoreCase(collectionType))
+                    return objToListOrSetBB(objects.subList(1, 
objects.size()));
+                else if ("map".equalsIgnoreCase(collectionType))
+                    return objToMapBB(objects.subList(1, objects.size()));
+                   
             }
-            out.flip();
-            return out;
+            return objToCompositeBB(objects);
         }
 
         return ByteBuffer.wrap(((DataByteArray) o).get());
     }
 
+    private ByteBuffer objToListOrSetBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new 
ArrayList<ByteBuffer>(objects.size());
+        for(Object sub : objects)
+        {
+            ByteBuffer buffer = objToBB(sub);
+            serialized.add(buffer);
+        }      
+        return CollectionType.pack(serialized, objects.size());
+    }
+
+    private ByteBuffer objToMapBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() 
* 2);
+        for(Object sub : objects)
+        {
+            List<Object> keyValue = ((Tuple)sub).getAll();
+            for (Object entry: keyValue)
+            {
+                ByteBuffer buffer = objToBB(entry);
+                serialized.add(buffer);
+            }
+        } 
+        return CollectionType.pack(serialized, objects.size());
+    }
+
+    private ByteBuffer objToCompositeBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new 
ArrayList<ByteBuffer>(objects.size());
+        int totalLength = 0;
+        for(Object sub : objects)
+        {
+            ByteBuffer buffer = objToBB(sub);
+            serialized.add(buffer);
+            totalLength += 2 + buffer.remaining() + 1;
+        }
+        ByteBuffer out = ByteBuffer.allocate(totalLength);
+        for (ByteBuffer bb : serialized)
+        {
+            int length = bb.remaining();
+            out.put((byte) ((length >> 8) & 0xFF));
+            out.put((byte) (length & 0xFF));
+            out.put(bb);
+            out.put((byte) 0);
+        }
+        out.flip();
+        return out;
+    }
+
     public void cleanupOnFailure(String failure, Job job)
     {
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java 
b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 7e22823..a73e5a5 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -35,6 +35,7 @@ import org.apache.pig.Expression;
 import org.apache.pig.Expression.OpType;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.*;
 import org.apache.pig.impl.util.UDFContext;
@@ -107,10 +108,11 @@ public class CqlStorage extends AbstractCassandraStorage
                 if (columnValue != null)
                 {
                     IColumn column = new Column(cdef.name, columnValue);
-                    tuple.set(i, columnToTuple(column, cfDef, 
UTF8Type.instance));
+                    AbstractType<?> validator = 
getValidatorMap(cfDef).get(column.name());
+                    setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), 
validator);
                 }
                 else
-                    tuple.set(i, TupleFactory.getInstance().newTuple());
+                    tuple.set(i, null);
                 i++;
             }
             return tuple;
@@ -121,6 +123,74 @@ public class CqlStorage extends AbstractCassandraStorage
         }
     }
 
+    /** set the value to the position of the tuple */
+    private void setTupleValue(Tuple tuple, int position, Object value, 
AbstractType<?> validator) throws ExecException
+    {
+        if (validator instanceof CollectionType)
+            setCollectionTupleValues(tuple, position, value, validator);
+        else
+           setTupleValue(tuple, position, value);
+    }
+
+    /** set the values of set/list at and after the position of the tuple */
+    private void setCollectionTupleValues(Tuple tuple, int position, Object 
value, AbstractType<?> validator) throws ExecException
+    {
+        if (validator instanceof MapType)
+        {
+            setMapTupleValues(tuple, position, value, validator);
+            return;
+        }
+        AbstractType<?> elementValidator;
+        if (validator instanceof SetType)
+            elementValidator = ((SetType<?>) validator).elements;
+        else if (validator instanceof ListType)
+            elementValidator = ((ListType<?>) validator).elements;
+        else 
+            return;
+        
+        int i = 0;
+        Tuple innerTuple = 
TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
+        for (Object entry : (Collection<?>) value)
+        {
+            setTupleValue(innerTuple, i, entry, elementValidator);
+            i++;
+        }
+        tuple.set(position, innerTuple);
+    }
+
+    /** set the values of set/list at and after the position of the tuple */
+    private void setMapTupleValues(Tuple tuple, int position, Object value, 
AbstractType<?> validator) throws ExecException
+    {
+        AbstractType<?> keyValidator = ((MapType<?, ?>) validator).keys;
+        AbstractType<?> valueValidator = ((MapType<?, ?>) validator).values;
+        
+        int i = 0;
+        Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) 
value).size());
+        for(Map.Entry<?,?> entry :  ((Map<Object, Object>)value).entrySet())
+        {
+            Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
+            setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator);
+            setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator);
+            innerTuple.set(i, mapEntryTuple);
+            i++;
+        }
+        tuple.set(position, innerTuple);
+    }
+
+    /** convert a cql column to an object */
+    private Object cqlColumnToObj(IColumn col, CfDef cfDef) throws IOException
+    {
+        // standard
+        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+        if (validators.get(col.name()) == null)
+        {
+            Map<MarshallerType, AbstractType> marshallers = 
getDefaultMarshallers(cfDef);
+            return 
marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value());
+        }
+        else
+            return validators.get(col.name()).compose(col.value());
+    }
+
     /** set read configuration settings */
     public void setLocation(String location, Job job) throws IOException
     {
@@ -410,7 +480,7 @@ public class CqlStorage extends AbstractCassandraStorage
 
                 // output prepared statement
                 if (urlQuery.containsKey("output_query"))
-                    outputQuery = urlQuery.get("output_query").replaceAll("#", 
"?").replaceAll("@", "=");
+                    outputQuery = urlQuery.get("output_query");
 
                 // user defined where clause
                 if (urlQuery.containsKey("where_clause"))
@@ -457,7 +527,7 @@ public class CqlStorage extends AbstractCassandraStorage
         String name = be.getLhs().toString();
         String value = be.getRhs().toString();
         OpType op = expression.getOpType();
-        String opString = op.name();
+        String opString = op.toString();
         switch (op)
         {
             case OP_EQ:

Reply via email to