Correctly decode row keys in widerow mode. Patch by brandonwilliams reviewed by aleksey for CASSANDRA-5098
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3034eeec Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3034eeec Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3034eeec Branch: refs/heads/cassandra-1.2 Commit: 3034eeecbfd2680556378fec64b73d190908c01b Parents: e1206f3 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Fri Jan 4 10:38:45 2013 -0600 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Fri Jan 4 10:38:45 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/hadoop/pig/CassandraStorage.java | 14 ++++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3034eeec/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 55a6bb5..c1c0930 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,6 +3,7 @@ * fix multithreaded compaction deadlock (CASSANDRA-4492) * fix specifying and altering crc_check_chance (CASSANDRA-5053) * Don't expire columns sooner than they should in 2ndary indexes (CASSANDRA-5079) + * Pig: correctly decode row keys in widerow mode (CASSANDRA-5098) 1.1.8 http://git-wip-us.apache.org/repos/asf/cassandra/blob/3034eeec/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index c2f1c13..019682a 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -143,7 +143,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo if (tuple.size() == 0) // lastRow is a new one { key = (ByteBuffer)reader.getCurrentKey(); - tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); + addKeyToTuple(tuple, key, cfDef, parseType(cfDef.getKey_validation_class())); } for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet()) { @@ -179,7 +179,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo key = (ByteBuffer)reader.getCurrentKey(); if (lastKey != null && !(key.equals(lastKey))) // last key only had one value { - tuple.append(new DataByteArray(lastKey.array(), lastKey.position()+lastKey.arrayOffset(), lastKey.limit()+lastKey.arrayOffset())); + addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet()) { bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); @@ -189,7 +189,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); return tuple; } - tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); + addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); } SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); if (lastRow != null) // prepend what was read last time @@ -294,6 +294,12 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException { Tuple tuple = TupleFactory.getInstance().newTuple(1); + addKeyToTuple(tuple, key, cfDef, comparator); + return tuple; + } + + private void addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException + { if( comparator instanceof AbstractCompositeType ) { setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key)); @@ -302,7 +308,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(2).compose(key)); } - return tuple; + } private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException