Author: brandonwilliams Date: Wed Sep 28 22:01:08 2011 New Revision: 1177084
URL: http://svn.apache.org/viewvc?rev=1177084&view=rev Log: Fix handling of integer types in pig. Patch by brandonwilliams, reviewed by Jeremy Hanna for CASSANDRA-2810 Modified: cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Modified: cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1177084&r1=1177083&r2=1177084&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original) +++ cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Wed Sep 28 22:01:08 2011 @@ -17,11 +17,13 @@ package org.apache.cassandra.hadoop.pig; import java.io.IOException; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.*; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.IntegerType; import org.apache.cassandra.db.marshal.TypeParser; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.FBUtilities; @@ -143,18 +145,14 @@ public class CassandraStorage extends Lo List<AbstractType> marshallers = getDefaultMarshallers(cfDef); Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); + setTupleValue(pair, 0, marshallers.get(0).compose(name)); if (col instanceof Column) { // standard - pair.set(0, marshallers.get(0).compose(name)); if (validators.get(name) == null) - // Have to special case BytesType because compose returns a ByteBuffer - if (marshallers.get(1) instanceof BytesType) - pair.set(1, new DataByteArray(ByteBufferUtil.getArray(col.value()))); - else - pair.set(1, marshallers.get(1).compose(col.value())); + setTupleValue(pair, 1, marshallers.get(1).compose(col.value())); else - pair.set(1, validators.get(name).compose(col.value())); + setTupleValue(pair, 1, validators.get(name).compose(col.value())); return pair; } @@ -167,6 +165,16 @@ public class CassandraStorage extends Lo return pair; } + private void setTupleValue(Tuple pair, int position, Object value) throws ExecException + { + if (value instanceof BigInteger) + pair.set(position, ((BigInteger) value).intValue()); + else if (value instanceof ByteBuffer) + pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value))); + else + pair.set(position, value); + } + private CfDef getCfDef(String signature) { UDFContext context = UDFContext.getUDFContext(); @@ -453,8 +461,6 @@ public class CassandraStorage extends Lo DefaultDataBag pairs = (DefaultDataBag) t.get(1); ArrayList<Mutation> mutationList = new ArrayList<Mutation>(); CfDef cfDef = getCfDef(storeSignature); - List<AbstractType> marshallers = getDefaultMarshallers(cfDef); - Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); try { for (Tuple pair : pairs) @@ -498,15 +504,8 @@ public class CassandraStorage extends Lo else { org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column(); - column.name = marshallers.get(0).decompose((pair.get(0))); - if (validators.get(column.name) == null) - // Have to special case BytesType to convert DataByteArray into ByteBuffer - if (marshallers.get(1) instanceof BytesType) - column.value = objToBB(pair.get(1)); - else - column.value = marshallers.get(1).decompose(pair.get(1)); - else - column.value = validators.get(column.name).decompose(pair.get(1)); + column.name = objToBB(pair.get(0)); + column.value = objToBB(pair.get(1)); column.setTimestamp(System.currentTimeMillis() * 1000); mutation.column_or_supercolumn = new ColumnOrSuperColumn(); mutation.column_or_supercolumn.column = column; @@ -626,3 +625,4 @@ public class CassandraStorage extends Lo return cfDef; } } +