Correctly decode row keys in widerow mode.
Patch by brandonwilliams reviewed by aleksey for CASSANDRA-5098


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3034eeec
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3034eeec
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3034eeec

Branch: refs/heads/cassandra-1.2
Commit: 3034eeecbfd2680556378fec64b73d190908c01b
Parents: e1206f3
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Fri Jan 4 10:38:45 2013 -0600
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Fri Jan 4 10:38:45 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/hadoop/pig/CassandraStorage.java     |   14 ++++++++++----
 2 files changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3034eeec/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 55a6bb5..c1c0930 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
  * fix multithreaded compaction deadlock (CASSANDRA-4492)
  * fix specifying and altering crc_check_chance (CASSANDRA-5053)
  * Don't expire columns sooner than they should in 2ndary indexes 
(CASSANDRA-5079)
+ * Pig: correctly decode row keys in widerow mode (CASSANDRA-5098)
 
 
 1.1.8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3034eeec/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java 
b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index c2f1c13..019682a 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -143,7 +143,7 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
                         if (tuple.size() == 0) // lastRow is a new one
                         {
                             key = (ByteBuffer)reader.getCurrentKey();
-                            tuple.append(new DataByteArray(key.array(), 
key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
+                            addKeyToTuple(tuple, key, cfDef, 
parseType(cfDef.getKey_validation_class()));
                         }
                         for (Map.Entry<ByteBuffer, IColumn> entry : 
lastRow.entrySet())
                         {
@@ -179,7 +179,7 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
                     key = (ByteBuffer)reader.getCurrentKey();
                     if (lastKey != null && !(key.equals(lastKey))) // last key 
only had one value
                     {
-                        tuple.append(new DataByteArray(lastKey.array(), 
lastKey.position()+lastKey.arrayOffset(), 
lastKey.limit()+lastKey.arrayOffset()));
+                        addKeyToTuple(tuple, lastKey, cfDef, 
parseType(cfDef.getKey_validation_class()));
                         for (Map.Entry<ByteBuffer, IColumn> entry : 
lastRow.entrySet())
                         {
                             bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
@@ -189,7 +189,7 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
                         lastRow = 
(SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
                         return tuple;
                     }
-                    tuple.append(new DataByteArray(key.array(), 
key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
+                    addKeyToTuple(tuple, lastKey, cfDef, 
parseType(cfDef.getKey_validation_class()));
                 }
                 SortedMap<ByteBuffer,IColumn> row = 
(SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
                 if (lastRow != null) // prepend what was read last time
@@ -294,6 +294,12 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
     private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType 
comparator) throws IOException
     {
         Tuple tuple = TupleFactory.getInstance().newTuple(1);
+        addKeyToTuple(tuple, key, cfDef, comparator);
+        return tuple;
+    }
+
+    private void addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, 
AbstractType comparator) throws IOException
+    {
         if( comparator instanceof AbstractCompositeType )
         {
             setTupleValue(tuple, 0, 
composeComposite((AbstractCompositeType)comparator,key));
@@ -302,7 +308,7 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
         {
             setTupleValue(tuple, 0, 
getDefaultMarshallers(cfDef).get(2).compose(key));
         }
-        return tuple;
+
     }
 
     private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType 
comparator) throws IOException

Reply via email to