Updated Branches: refs/heads/cassandra-2.0 a2b12784f -> dccc91db0 refs/heads/trunk b77fdcc3d -> 3f918e233
Add more data type mappings for pig. Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-6128 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bdb7bb16 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bdb7bb16 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bdb7bb16 Branch: refs/heads/cassandra-2.0 Commit: bdb7bb16facda0fbe266390bd3213f092d02c0dc Parents: 9d31ac1 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Mon Oct 7 13:57:45 2013 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Mon Oct 7 14:03:08 2013 -0500 ---------------------------------------------------------------------- .../hadoop/pig/AbstractCassandraStorage.java | 30 +++++++++++++++----- .../cassandra/hadoop/pig/CassandraStorage.java | 2 +- .../apache/cassandra/hadoop/pig/CqlStorage.java | 7 ++--- 3 files changed, 26 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdb7bb16/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java index ce92014..6ad4f9e 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java @@ -110,7 +110,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store List<CompositeComponent> result = comparator.deconstruct(name); Tuple t = TupleFactory.getInstance().newTuple(result.size()); for (int i=0; i<result.size(); i++) - setTupleValue(t, i, result.get(i).comparator.compose(result.get(i).value)); + setTupleValue(t, i, cassandraToObj(result.get(i).comparator, result.get(i).value)); return t; } @@ -124,7 +124,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store if(comparator instanceof AbstractCompositeType) setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,col.name())); else - setTupleValue(pair, 0, comparator.compose(col.name())); + setTupleValue(pair, 0, cassandraToObj(comparator, col.name())); // value if (col instanceof Column) @@ -134,10 +134,10 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store if (validators.get(col.name()) == null) { Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef); - setTupleValue(pair, 1, marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value())); + setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value())); } else - setTupleValue(pair, 1, validators.get(col.name()).compose(col.value())); + setTupleValue(pair, 1, cassandraToObj(validators.get(col.name()), col.value())); return pair; } else @@ -327,9 +327,12 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store return DataType.LONG; else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType will overflow at 2**31, but is kept for compatibility until pig has a BigInteger return DataType.INTEGER; - else if (type instanceof AsciiType) - return DataType.CHARARRAY; - else if (type instanceof UTF8Type) + else if (type instanceof AsciiType || + type instanceof UTF8Type || + type instanceof DecimalType || + type instanceof InetAddressType || + type instanceof LexicalUUIDType || + type instanceof UUIDType ) return DataType.CHARARRAY; else if (type instanceof FloatType) return DataType.FLOAT; @@ -772,5 +775,18 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store } return null; } + + protected Object cassandraToObj(AbstractType validator, ByteBuffer value) + { + if (validator instanceof DecimalType || + validator instanceof InetAddressType || + validator instanceof LexicalUUIDType || + validator instanceof UUIDType) + { + return validator.getString(value); + } + else + return validator.compose(value); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdb7bb16/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 09171a0..1135b70 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -731,7 +731,7 @@ public class CassandraStorage extends AbstractCassandraStorage } else { - setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR).compose(key)); + setTupleValue(tuple, 0, cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR), key)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdb7bb16/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java index 79abc2c..e51338c 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java @@ -189,12 +189,9 @@ public class CqlStorage extends AbstractCassandraStorage // standard Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); if (validators.get(col.name()) == null) - { - Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef); - return marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value()); - } + return cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.DEFAULT_VALIDATOR), col.value()); else - return validators.get(col.name()).compose(col.value()); + return cassandraToObj(validators.get(col.name()), col.value()); } /** set read configuration settings */