Pig: enable secondary index usage via pushdown.
Patch by brandonwilliams, reviewed by xedin for CASSANDRA-4238


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/db68e03f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/db68e03f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/db68e03f

Branch: refs/heads/trunk
Commit: db68e03f7de49935b2f0e43a91a4a8d2ca134a08
Parents: dad8d80
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Sat May 26 10:38:42 2012 -0500
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Sat May 26 10:38:42 2012 -0500

----------------------------------------------------------------------
 .../cassandra/hadoop/pig/CassandraStorage.java     |  147 ++++++++++++++-
 1 files changed, 144 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/db68e03f/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java 
b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index fef7412..5742cb9 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -79,10 +79,14 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
     public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
     public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES";
     public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT";
+    public final static String PIG_USE_SECONDARY = "PIG_USE_SECONDARY";
 
     private final static String DEFAULT_INPUT_FORMAT = 
"org.apache.cassandra.hadoop.ColumnFamilyInputFormat";
     private final static String DEFAULT_OUTPUT_FORMAT = 
"org.apache.cassandra.hadoop.ColumnFamilyOutputFormat";
     private final static boolean DEFAULT_WIDEROW_INPUT = false;
+    private final static boolean DEFAULT_USE_SECONDARY = false;
+
+    private final static String PARTITION_FILTER_SIGNATURE = 
"cassandra.partition.filter";
 
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
     private static final Log logger = 
LogFactory.getLog(CassandraStorage.class);
@@ -103,6 +107,7 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
     private String outputFormatClass;
     private int limit;
     private boolean widerows;
+    private boolean usePartitionFilter;
     // wide row hacks
     private Map<ByteBuffer,IColumn> lastRow;
     private boolean hasNext = true;
@@ -244,6 +249,15 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
                     bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
             }
             tuple.append(bag);
+            // finally, special top-level indexes if needed
+            if (usePartitionFilter)
+            {
+                for (ColumnDef cdef : getIndexes())
+                {
+                    Tuple throwaway = columnToTuple(cf.get(cdef.name), cfDef, 
parseType(cfDef.getComparator_type()));
+                    tuple.append(throwaway.get(1));
+                }
+            }
             return tuple;
         }
         catch (InterruptedException e)
@@ -326,6 +340,16 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
         return cfdefFromString(property.getProperty(signature));
     }
 
+    private List<IndexExpression> getIndexExpressions()
+    {
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = context.getUDFProperties(CassandraStorage.class);
+        if (property.getProperty(PARTITION_FILTER_SIGNATURE) != null)
+            return 
indexExpressionsFromString(property.getProperty(PARTITION_FILTER_SIGNATURE));
+        else
+            return null;
+    }
+
     private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws 
IOException
     {
         ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>();
@@ -511,6 +535,13 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
         widerows = DEFAULT_WIDEROW_INPUT;
         if (System.getenv(PIG_WIDEROW_INPUT) != null)
             widerows = Boolean.valueOf(System.getProperty(PIG_WIDEROW_INPUT));
+        usePartitionFilter = DEFAULT_USE_SECONDARY;
+        if (System.getenv() != null)
+            usePartitionFilter = 
Boolean.valueOf(System.getenv(PIG_USE_SECONDARY));
+
+        if (usePartitionFilter && getIndexExpressions() != null)
+            ConfigHelper.setInputRange(conf, getIndexExpressions());
+
         ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, 
widerows);
         setConnectionInformation();
 
@@ -602,6 +633,20 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
         // bag at the end for unknown columns
         allSchemaFields.add(bagField);
 
+        // add top-level index elements if needed
+        if (usePartitionFilter)
+        {
+            for (ColumnDef cdef : getIndexes())
+            {
+                ResourceFieldSchema idxSchema = new ResourceFieldSchema();
+                idxSchema.setName("index_" + new String(cdef.getName()));
+                AbstractType validator = validators.get(cdef.name);
+                if (validator == null)
+                    validator = marshallers.get(1);
+                idxSchema.setType(getPigType(validator));
+                allSchemaFields.add(idxSchema);
+            }
+        }
         // top level schema contains everything
         schema.setFields(allSchemaFields.toArray(new 
ResourceFieldSchema[allSchemaFields.size()]));
         return schema;
@@ -634,12 +679,67 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
 
     public String[] getPartitionKeys(String location, Job job)
     {
-        return null;
+        if (!usePartitionFilter)
+            return null;
+        List<ColumnDef> indexes = getIndexes();
+        String[] partitionKeys = new String[indexes.size()];
+        for (int i = 0; i < indexes.size(); i++)
+        {
+            partitionKeys[i] = new String(indexes.get(i).getName());
+        }
+        return partitionKeys;
     }
 
     public void setPartitionFilter(Expression partitionFilter)
     {
-        // no-op
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = context.getUDFProperties(CassandraStorage.class);
+        property.setProperty(PARTITION_FILTER_SIGNATURE, 
indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
+    }
+
+    private List<IndexExpression> filterToIndexExpressions(Expression 
expression)
+    {
+        List<IndexExpression> indexExpressions = new 
ArrayList<IndexExpression>();
+        Expression.BinaryExpression be = 
(Expression.BinaryExpression)expression;
+        ByteBuffer name = ByteBuffer.wrap(be.getLhs().toString().getBytes());
+        ByteBuffer value = ByteBuffer.wrap(be.getRhs().toString().getBytes());
+        switch (expression.getOpType())
+        {
+            case OP_EQ:
+                indexExpressions.add(new IndexExpression(name, 
IndexOperator.EQ, value));
+                break;
+            case OP_GE:
+                indexExpressions.add(new IndexExpression(name, 
IndexOperator.GTE, value));
+                break;
+            case OP_GT:
+                indexExpressions.add(new IndexExpression(name, 
IndexOperator.GT, value));
+                break;
+            case OP_LE:
+                indexExpressions.add(new IndexExpression(name, 
IndexOperator.LTE, value));
+                break;
+            case OP_LT:
+                indexExpressions.add(new IndexExpression(name, 
IndexOperator.LT, value));
+                break;
+            case OP_AND:
+                indexExpressions.addAll(filterToIndexExpressions(be.getLhs()));
+                indexExpressions.addAll(filterToIndexExpressions(be.getRhs()));
+                break;
+            default:
+                throw new RuntimeException("Unsupported expression type: " + 
expression.getOpType().name());
+        }
+        return indexExpressions;
+    }
+
+    private List<ColumnDef> getIndexes()
+    {
+        CfDef cfdef = getCfDef(loadSignature);
+        List<ColumnDef> indexes = new ArrayList<ColumnDef>();
+        for (ColumnDef cdef : cfdef.column_metadata)
+        {
+            if (cdef.index_type != null)
+                indexes.add(cdef);
+        }
+        return indexes;
     }
 
     @Override
@@ -679,6 +779,11 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
         if (ConfigHelper.getOutputPartitioner(conf) == null)
             throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER 
environment variable not set");
 
+        // we have to do this again here for the check in writeColumnsFromTuple
+        usePartitionFilter = DEFAULT_USE_SECONDARY;
+        if (System.getenv() != null)
+            usePartitionFilter = 
Boolean.valueOf(System.getenv(PIG_USE_SECONDARY));
+
         initSchema(storeSignature);
     }
 
@@ -765,8 +870,10 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
                 if (inner.size() > 0) // may be empty, for an indexed column 
that wasn't present
                     mutationList.add(mutationFromTuple(inner));
             }
-            else
+            else if (!usePartitionFilter)
+            {
                 throw new IOException("Output type was not a bag or a tuple");
+            }
         }
         if (mutationList.size() > 0)
             writeMutations(key, mutationList);
@@ -947,5 +1054,39 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
         }
         return cfDef;
     }
+
+    private static String indexExpressionsToString(List<IndexExpression> 
indexExpressions)
+    {
+        assert indexExpressions != null;
+        // oh, you thought cfdefToString was awful?
+        IndexClause indexClause = new IndexClause();
+        indexClause.setExpressions(indexExpressions);
+        indexClause.setStart_key("".getBytes());
+        TSerializer serializer = new TSerializer(new 
TBinaryProtocol.Factory());
+        try
+        {
+            return Hex.bytesToHex(serializer.serialize(indexClause));
+        }
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static List<IndexExpression> indexExpressionsFromString(String ie)
+    {
+        assert ie != null;
+        TDeserializer deserializer = new TDeserializer(new 
TBinaryProtocol.Factory());
+        IndexClause indexClause = new IndexClause();
+        try
+        {
+            deserializer.deserialize(indexClause, Hex.hexToBytes(ie));
+        }
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return indexClause.getExpressions();
+    }
 }
 

Reply via email to