Updated Branches: refs/heads/cassandra-1.1 0db940695 -> 2dd73d171
Fix 5488 round 2 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2dd73d17 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2dd73d17 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2dd73d17 Branch: refs/heads/cassandra-1.1 Commit: 2dd73d171068d743befcd445a14751032d232e4e Parents: 0db9406 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Wed May 22 11:18:59 2013 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Wed May 22 11:19:05 2013 -0500 ---------------------------------------------------------------------- .../cassandra/hadoop/pig/CassandraStorage.java | 34 ++++++++++----- 1 files changed, 23 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2dd73d17/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index b681ee3..cf1c08f 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -130,7 +130,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { CfDef cfDef = getCfDef(loadSignature); ByteBuffer key = null; - Tuple tuple = TupleFactory.getInstance().newTuple(); + Tuple tuple = null; DefaultDataBag bag = new DefaultDataBag(); try { @@ -139,12 +139,15 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo hasNext = reader.nextKeyValue(); if (!hasNext) { + if (tuple == null) + tuple = TupleFactory.getInstance().newTuple(); + if (lastRow != null) { if (tuple.size() == 0) // lastRow is a new one { key = (ByteBuffer)reader.getCurrentKey(); - tuple = addKeyToTuple(tuple, key, cfDef, parseType(cfDef.getKey_validation_class())); + tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class())); } for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet()) { @@ -180,7 +183,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo key = (ByteBuffer)reader.getCurrentKey(); if (lastKey != null && !(key.equals(lastKey))) // last key only had one value { - tuple = addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); + if (tuple == null) + tuple = keyToTuple(lastKey, cfDef, parseType(cfDef.getKey_validation_class())); + else + addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet()) { bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); @@ -190,7 +196,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); return tuple; } - tuple = addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); + if (tuple == null) + tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class())); + else + addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); } SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); if (lastRow != null) // prepend what was read last time @@ -233,7 +242,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo // output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest // NOTE: we're setting the tuple size here only for the key so we can use setTupleValue on it - Tuple tuple = addKeyToTuple(null, key, cfDef, parseType(cfDef.getKey_validation_class())); + Tuple tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class())); DefaultDataBag bag = new DefaultDataBag(); // we must add all the indexed columns first to match the schema @@ -292,12 +301,15 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo return t; } - private Tuple addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException + private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException + { + Tuple tuple = TupleFactory.getInstance().newTuple(1); + addKeyToTuple(tuple, key, cfDef, comparator); + return tuple; + } + + private void addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException { - if( tuple == null ) - { - tuple = TupleFactory.getInstance().newTuple(1); - } if( comparator instanceof AbstractCompositeType ) { setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key)); @@ -306,7 +318,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR).compose(key)); } - return tuple; + } private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException