Correctly decode row keys in widerow mode. Patch by brandonwilliams reviewed by aleksey for CASSANDRA-5098
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/607dd90c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/607dd90c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/607dd90c Branch: refs/heads/trunk Commit: 607dd90c2504cbb9c2f71e53c6154361d4eda87d Parents: b41fc77 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Fri Jan 4 10:38:45 2013 -0600 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Fri Jan 4 11:12:29 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/hadoop/pig/CassandraStorage.java | 14 ++++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/607dd90c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 19cd217..635a04c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -49,6 +49,7 @@ Merged from 1.1: * fix specifying and altering crc_check_chance (CASSANDRA-5053) * fix Murmur3Partitioner ownership% calculation (CASSANDRA-5076) * Don't expire columns sooner than they should in 2ndary indexes (CASSANDRA-5079) + * Pig: correctly decode row keys in widerow mode (CASSANDRA-5098) 1.2-rc1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/607dd90c/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 91174f3..34af1f4 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -144,7 +144,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo if (tuple.size() == 0) // lastRow is a new one { key = (ByteBuffer)reader.getCurrentKey(); - tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); + addKeyToTuple(tuple, key, cfDef, parseType(cfDef.getKey_validation_class())); } for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet()) { @@ -180,7 +180,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo key = (ByteBuffer)reader.getCurrentKey(); if (lastKey != null && !(key.equals(lastKey))) // last key only had one value { - tuple.append(new DataByteArray(lastKey.array(), lastKey.position()+lastKey.arrayOffset(), lastKey.limit()+lastKey.arrayOffset())); + addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet()) { bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); @@ -190,7 +190,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo lastRow = (SortedMap<ByteBuffer,Column>)reader.getCurrentValue(); return tuple; } - tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); + addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); } SortedMap<ByteBuffer,Column> row = (SortedMap<ByteBuffer,Column>)reader.getCurrentValue(); if (lastRow != null) // prepend what was read last time @@ -295,6 +295,12 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException { Tuple tuple = TupleFactory.getInstance().newTuple(1); + addKeyToTuple(tuple, key, cfDef, comparator); + return tuple; + } + + private void addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException + { if( comparator instanceof AbstractCompositeType ) { setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key)); @@ -303,7 +309,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(2).compose(key)); } - return tuple; + } private Tuple columnToTuple(Column col, CfDef cfDef, AbstractType comparator) throws IOException