Pig: enable secondary index usage via pushdown. Patch by brandonwilliams, reviewed by xedin for CASSANDRA-4238
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/db68e03f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/db68e03f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/db68e03f Branch: refs/heads/trunk Commit: db68e03f7de49935b2f0e43a91a4a8d2ca134a08 Parents: dad8d80 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Sat May 26 10:38:42 2012 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Sat May 26 10:38:42 2012 -0500 ---------------------------------------------------------------------- .../cassandra/hadoop/pig/CassandraStorage.java | 147 ++++++++++++++- 1 files changed, 144 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/db68e03f/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index fef7412..5742cb9 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -79,10 +79,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT"; public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES"; public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT"; + public final static String PIG_USE_SECONDARY = "PIG_USE_SECONDARY"; private final static String DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat"; private final static String DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat"; private final static boolean DEFAULT_WIDEROW_INPUT = false; + private final static boolean DEFAULT_USE_SECONDARY = false; + + private final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter"; private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER; private static final Log logger = LogFactory.getLog(CassandraStorage.class); @@ -103,6 +107,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private String outputFormatClass; private int limit; private boolean widerows; + private boolean usePartitionFilter; // wide row hacks private Map<ByteBuffer,IColumn> lastRow; private boolean hasNext = true; @@ -244,6 +249,15 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); } tuple.append(bag); + // finally, special top-level indexes if needed + if (usePartitionFilter) + { + for (ColumnDef cdef : getIndexes()) + { + Tuple throwaway = columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type())); + tuple.append(throwaway.get(1)); + } + } return tuple; } catch (InterruptedException e) @@ -326,6 +340,16 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo return cfdefFromString(property.getProperty(signature)); } + private List<IndexExpression> getIndexExpressions() + { + UDFContext context = UDFContext.getUDFContext(); + Properties property = context.getUDFProperties(CassandraStorage.class); + if (property.getProperty(PARTITION_FILTER_SIGNATURE) != null) + return indexExpressionsFromString(property.getProperty(PARTITION_FILTER_SIGNATURE)); + else + return null; + } + private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException { ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>(); @@ -511,6 +535,13 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo widerows = DEFAULT_WIDEROW_INPUT; if (System.getenv(PIG_WIDEROW_INPUT) != null) widerows = Boolean.valueOf(System.getProperty(PIG_WIDEROW_INPUT)); + usePartitionFilter = DEFAULT_USE_SECONDARY; + if (System.getenv() != null) + usePartitionFilter = Boolean.valueOf(System.getenv(PIG_USE_SECONDARY)); + + if (usePartitionFilter && getIndexExpressions() != null) + ConfigHelper.setInputRange(conf, getIndexExpressions()); + ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, widerows); setConnectionInformation(); @@ -602,6 +633,20 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo // bag at the end for unknown columns allSchemaFields.add(bagField); + // add top-level index elements if needed + if (usePartitionFilter) + { + for (ColumnDef cdef : getIndexes()) + { + ResourceFieldSchema idxSchema = new ResourceFieldSchema(); + idxSchema.setName("index_" + new String(cdef.getName())); + AbstractType validator = validators.get(cdef.name); + if (validator == null) + validator = marshallers.get(1); + idxSchema.setType(getPigType(validator)); + allSchemaFields.add(idxSchema); + } + } // top level schema contains everything schema.setFields(allSchemaFields.toArray(new ResourceFieldSchema[allSchemaFields.size()])); return schema; @@ -634,12 +679,67 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo public String[] getPartitionKeys(String location, Job job) { - return null; + if (!usePartitionFilter) + return null; + List<ColumnDef> indexes = getIndexes(); + String[] partitionKeys = new String[indexes.size()]; + for (int i = 0; i < indexes.size(); i++) + { + partitionKeys[i] = new String(indexes.get(i).getName()); + } + return partitionKeys; } public void setPartitionFilter(Expression partitionFilter) { - // no-op + UDFContext context = UDFContext.getUDFContext(); + Properties property = context.getUDFProperties(CassandraStorage.class); + property.setProperty(PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter))); + } + + private List<IndexExpression> filterToIndexExpressions(Expression expression) + { + List<IndexExpression> indexExpressions = new ArrayList<IndexExpression>(); + Expression.BinaryExpression be = (Expression.BinaryExpression)expression; + ByteBuffer name = ByteBuffer.wrap(be.getLhs().toString().getBytes()); + ByteBuffer value = ByteBuffer.wrap(be.getRhs().toString().getBytes()); + switch (expression.getOpType()) + { + case OP_EQ: + indexExpressions.add(new IndexExpression(name, IndexOperator.EQ, value)); + break; + case OP_GE: + indexExpressions.add(new IndexExpression(name, IndexOperator.GTE, value)); + break; + case OP_GT: + indexExpressions.add(new IndexExpression(name, IndexOperator.GT, value)); + break; + case OP_LE: + indexExpressions.add(new IndexExpression(name, IndexOperator.LTE, value)); + break; + case OP_LT: + indexExpressions.add(new IndexExpression(name, IndexOperator.LT, value)); + break; + case OP_AND: + indexExpressions.addAll(filterToIndexExpressions(be.getLhs())); + indexExpressions.addAll(filterToIndexExpressions(be.getRhs())); + break; + default: + throw new RuntimeException("Unsupported expression type: " + expression.getOpType().name()); + } + return indexExpressions; + } + + private List<ColumnDef> getIndexes() + { + CfDef cfdef = getCfDef(loadSignature); + List<ColumnDef> indexes = new ArrayList<ColumnDef>(); + for (ColumnDef cdef : cfdef.column_metadata) + { + if (cdef.index_type != null) + indexes.add(cdef); + } + return indexes; } @Override @@ -679,6 +779,11 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo if (ConfigHelper.getOutputPartitioner(conf) == null) throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set"); + // we have to do this again here for the check in writeColumnsFromTuple + usePartitionFilter = DEFAULT_USE_SECONDARY; + if (System.getenv() != null) + usePartitionFilter = Boolean.valueOf(System.getenv(PIG_USE_SECONDARY)); + initSchema(storeSignature); } @@ -765,8 +870,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo if (inner.size() > 0) // may be empty, for an indexed column that wasn't present mutationList.add(mutationFromTuple(inner)); } - else + else if (!usePartitionFilter) + { throw new IOException("Output type was not a bag or a tuple"); + } } if (mutationList.size() > 0) writeMutations(key, mutationList); @@ -947,5 +1054,39 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo } return cfDef; } + + private static String indexExpressionsToString(List<IndexExpression> indexExpressions) + { + assert indexExpressions != null; + // oh, you thought cfdefToString was awful? + IndexClause indexClause = new IndexClause(); + indexClause.setExpressions(indexExpressions); + indexClause.setStart_key("".getBytes()); + TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory()); + try + { + return Hex.bytesToHex(serializer.serialize(indexClause)); + } + catch (TException e) + { + throw new RuntimeException(e); + } + } + + private static List<IndexExpression> indexExpressionsFromString(String ie) + { + assert ie != null; + TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory()); + IndexClause indexClause = new IndexClause(); + try + { + deserializer.deserialize(indexClause, Hex.hexToBytes(ie)); + } + catch (TException e) + { + throw new RuntimeException(e); + } + return indexClause.getExpressions(); + } }