Updated Branches: refs/heads/cassandra-1.0 ebafaeba3 -> b61f1d43e
Pig: support for composite columns Patch by Janne Jalkanen, reviewed by brandonwilliams for CASSANDRA-3684 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b61f1d43 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b61f1d43 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b61f1d43 Branch: refs/heads/cassandra-1.0 Commit: b61f1d43e74134b1c4fc27b68d27b59f9d3739d5 Parents: ebafaeb Author: Brandon Williams <brandonwilli...@apache.org> Authored: Wed Feb 29 14:33:20 2012 -0600 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Wed Feb 29 14:33:20 2012 -0600 ---------------------------------------------------------------------- .../cassandra/hadoop/pig/CassandraStorage.java | 43 ++++++++++++--- contrib/pig/test/populate-cli.txt | 20 +++++++ contrib/pig/test/test_storage.pig | 21 +++++++ .../db/marshal/AbstractCompositeType.java | 32 +++++++++++ 4 files changed, 109 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b61f1d43/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 6d1f76a..9c2dded 100644 --- a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.*; import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.IntegerType; import org.apache.cassandra.db.marshal.TypeParser; @@ -35,6 +36,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.cassandra.db.Column; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent; import org.apache.cassandra.hadoop.*; import org.apache.cassandra.thrift.Mutation; import org.apache.cassandra.thrift.Deletion; @@ -100,7 +102,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo this.limit = limit; } - public int getLimit() + public int getLimit() { return limit; } @@ -155,13 +157,37 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo } } + /** + * Deconstructs a composite type to a Tuple. + */ + private Tuple composeComposite( AbstractCompositeType comparator, ByteBuffer name ) throws IOException + { + List<CompositeComponent> result = comparator.deconstruct( name ); + + Tuple t = TupleFactory.getInstance().newTuple( result.size() ); + + for( int i = 0; i < result.size(); i++ ) + { + setTupleValue( t, i, result.get(i).comparator.compose( result.get(i).value ) ); + } + + return t; + } + private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException { Tuple pair = TupleFactory.getInstance().newTuple(2); List<AbstractType> marshallers = getDefaultMarshallers(cfDef); Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); - setTupleValue(pair, 0, comparator.compose(col.name())); + if( comparator instanceof AbstractCompositeType ) + { + setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,col.name())); + } + else + { + setTupleValue(pair, 0, comparator.compose(col.name())); + } if (col instanceof Column) { // standard @@ -321,15 +347,15 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { if (System.getenv(PIG_RPC_PORT) != null) ConfigHelper.setRpcPort(conf, System.getenv(PIG_RPC_PORT)); - else if (ConfigHelper.getRpcPort(conf) == 0) + else if (ConfigHelper.getRpcPort(conf) == 0) throw new IOException("PIG_RPC_PORT environment variable not set"); if (System.getenv(PIG_INITIAL_ADDRESS) != null) ConfigHelper.setInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS)); - else if (ConfigHelper.getInitialAddress(conf) == null) + else if (ConfigHelper.getInitialAddress(conf) == null) throw new IOException("PIG_INITIAL_ADDRESS environment variable not set"); if (System.getenv(PIG_PARTITIONER) != null) ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER)); - else if (ConfigHelper.getPartitioner(conf) == null) + else if (ConfigHelper.getPartitioner(conf) == null) throw new IOException("PIG_PARTITIONER environment variable not set"); if (System.getenv(PIG_ALLOW_DELETES) != null) allow_deletes = Boolean.valueOf(System.getenv(PIG_ALLOW_DELETES)); @@ -449,6 +475,9 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo return DataType.FLOAT; else if (type instanceof DoubleType) return DataType.DOUBLE; + else if (type instanceof AbstractCompositeType ) + return DataType.TUPLE; + return DataType.BYTEARRAY; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b61f1d43/contrib/pig/test/populate-cli.txt ---------------------------------------------------------------------- diff --git a/contrib/pig/test/populate-cli.txt b/contrib/pig/test/populate-cli.txt index 0164afe..c8124dd 100644 --- a/contrib/pig/test/populate-cli.txt +++ b/contrib/pig/test/populate-cli.txt @@ -86,3 +86,23 @@ incr CC['chuck']['kick']; incr CC['chuck']['kick']; incr CC['chuck']['kick']; incr CC['chuck']['fist']; + +create column family Compo + with key_validation_class = UTF8Type + and default_validation_class = UTF8Type + and comparator = 'CompositeType(UTF8Type,UTF8Type)'; + +set Compo['punch']['bruce:lee'] = 'ouch'; +set Compo['punch']['bruce:bruce'] = 'hunh?'; +set Compo['kick']['bruce:lee'] = 'oww'; +set Compo['kick']['bruce:bruce'] = 'watch it, mate'; + +create column family CompoInt + with key_validation_class = UTF8Type + and default_validation_class = UTF8Type + and comparator = 'CompositeType(LongType,LongType)'; + +set CompoInt['clock']['1:0'] = 'z'; +set CompoInt['clock']['1:30'] = 'zzzz'; +set CompoInt['clock']['2:30'] = 'daddy?'; +set CompoInt['clock']['6:30'] = 'coffee...'; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b61f1d43/contrib/pig/test/test_storage.pig ---------------------------------------------------------------------- diff --git a/contrib/pig/test/test_storage.pig b/contrib/pig/test/test_storage.pig index c49d4b3..a0157f7 100644 --- a/contrib/pig/test/test_storage.pig +++ b/contrib/pig/test/test_storage.pig @@ -47,3 +47,24 @@ CC = load 'cassandra://PigTest/CC' using CassandraStorage(); total_hits = foreach CC generate key, SUM(columns.value); dump total_hits; + +-- +-- Test CompositeType +-- + +compo = load 'cassandra://PigTest/Compo' using CassandraStorage(); + +compo = foreach compo generate key as method, flatten(columns); + +lee = filter compo by columns::name == ('bruce','lee'); + +dump lee; + +night = load 'cassandra://PigTest/CompoInt' using CassandraStorage(); +night = foreach night generate flatten(columns); +night = foreach night generate (int)columns::name.$0+(double)columns::name.$1/60 as hour, columns::value as noise; + +-- What happens at the darkest hour? +darkest = filter night by hour > 2 and hour < 5; + +dump darkest; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/b61f1d43/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java index 1ecc72e..e07807c 100644 --- a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java +++ b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java @@ -141,6 +141,38 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer> return sb.toString(); } + public static class CompositeComponent + { + public AbstractType comparator; + public ByteBuffer value; + + public CompositeComponent( AbstractType comparator, ByteBuffer value ) + { + this.comparator = comparator; + this.value = value; + } + } + + public List<CompositeComponent> deconstruct( ByteBuffer bytes ) + { + List<CompositeComponent> list = new ArrayList<CompositeComponent>(); + + ByteBuffer bb = bytes.duplicate(); + int i = 0; + + while (bb.remaining() > 0) + { + AbstractType comparator = getNextComparator(i, bb); + ByteBuffer value = getWithShortLength(bb); + + list.add( new CompositeComponent(comparator,value) ); + + byte b = bb.get(); // Ignore; not relevant here + ++i; + } + return list; + } + /* * FIXME: this would break if some of the component string representation * contains ':'. None of our current comparator do so, so this is probably