Merge branch 'cassandra-1.2' into cassandra-2.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3ad2e82
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3ad2e82
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3ad2e82

Branch: refs/heads/cassandra-2.0
Commit: a3ad2e82249b88d4a05f24140948cdbc809d14f3
Parents: 8a506e6 639c01a
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Fri Oct 11 15:30:27 2013 -0500
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Fri Oct 11 15:30:27 2013 -0500

----------------------------------------------------------------------
 .../hadoop/pig/AbstractCassandraStorage.java    | 97 ++++++++++++++------
 .../cassandra/hadoop/pig/CassandraStorage.java  | 55 +++++++----
 .../apache/cassandra/hadoop/pig/CqlStorage.java |  8 +-
 3 files changed, 109 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3ad2e82/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index c881734,dbebfb5..486c781
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@@ -127,14 -128,36 +128,37 @@@ public abstract class AbstractCassandra
              setTupleValue(pair, 0, cassandraToObj(comparator, col.name()));
  
          // value
 -        if (col instanceof Column)
 +        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
 +        if (validators.get(col.name()) == null)
          {
-             Map<MarshallerType, AbstractType> marshallers = 
getDefaultMarshallers(cfDef);
-             setTupleValue(pair, 1, 
cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
+             // standard
+             Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+             ByteBuffer colName;
+             if (cfInfo.cql3Table && !cfInfo.compactCqlTable)
+             {
+                 ByteBuffer[] names = ((AbstractCompositeType) 
parseType(cfDef.comparator_type)).split(col.name());
+                 colName = names[names.length-1];
+             }
+             else
+                 colName = col.name();
+             if (validators.get(colName) == null)
+             {
+                 Map<MarshallerType, AbstractType> marshallers = 
getDefaultMarshallers(cfDef);
+                 setTupleValue(pair, 1, 
cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
+             }
+             else
+                 setTupleValue(pair, 1, 
cassandraToObj(validators.get(colName), col.value()));
+             return pair;
          }
          else
-             setTupleValue(pair, 1, cassandraToObj(validators.get(col.name()), 
col.value()));
+         {
+             // super
+             ArrayList<Tuple> subcols = new ArrayList<Tuple>();
+             for (IColumn subcol : col.getSubColumns())
+                 subcols.add(columnToTuple(subcol, cfInfo, 
parseType(cfDef.getSubcomparator_type())));
+ 
+             pair.set(1, new DefaultDataBag(subcols));
+         }
          return pair;
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3ad2e82/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 4083236,a7cc1ad..d9c55a1
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@@ -124,9 -123,9 +125,9 @@@ public class CassandraStorage extends A
                              key = (ByteBuffer)reader.getCurrentKey();
                              tuple = keyToTuple(key, cfDef, 
parseType(cfDef.getKey_validation_class()));
                          }
 -                        for (Map.Entry<ByteBuffer, IColumn> entry : 
lastRow.entrySet())
 +                        for (Map.Entry<ByteBuffer, Column> entry : 
lastRow.entrySet())
                          {
-                             bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
+                             bag.add(columnToTuple(entry.getValue(), cfInfo, 
parseType(cfDef.getComparator_type())));
                          }
                          lastKey = null;
                          lastRow = null;
@@@ -162,9 -161,9 +163,9 @@@
                              tuple = keyToTuple(lastKey, cfDef, 
parseType(cfDef.getKey_validation_class()));
                          else
                              addKeyToTuple(tuple, lastKey, cfDef, 
parseType(cfDef.getKey_validation_class()));
 -                        for (Map.Entry<ByteBuffer, IColumn> entry : 
lastRow.entrySet())
 +                        for (Map.Entry<ByteBuffer, Column> entry : 
lastRow.entrySet())
                          {
-                             bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
+                             bag.add(columnToTuple(entry.getValue(), cfInfo, 
parseType(cfDef.getComparator_type())));
                          }
                          tuple.append(bag);
                          lastKey = key;
@@@ -176,19 -175,19 +177,19 @@@
                      else
                          addKeyToTuple(tuple, lastKey, cfDef, 
parseType(cfDef.getKey_validation_class()));
                  }
 -                SortedMap<ByteBuffer,IColumn> row = 
(SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
 +                SortedMap<ByteBuffer, Column> row = (SortedMap<ByteBuffer, 
Column>)reader.getCurrentValue();
                  if (lastRow != null) // prepend what was read last time
                  {
 -                    for (Map.Entry<ByteBuffer, IColumn> entry : 
lastRow.entrySet())
 +                    for (Map.Entry<ByteBuffer, Column> entry : 
lastRow.entrySet())
                      {
-                         bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
+                         bag.add(columnToTuple(entry.getValue(), cfInfo, 
parseType(cfDef.getComparator_type())));
                      }
                      lastKey = null;
                      lastRow = null;
                  }
 -                for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet())
 +                for (Map.Entry<ByteBuffer, Column> entry : row.entrySet())
                  {
-                     bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
+                     bag.add(columnToTuple(entry.getValue(), cfInfo, 
parseType(cfDef.getComparator_type())));
                  }
              }
          }
@@@ -210,9 -209,10 +211,10 @@@
              if (!reader.nextKeyValue())
                  return null;
  
-             CfDef cfDef = getCfDef(loadSignature);
+             CfInfo cfInfo = getCfInfo(loadSignature);
+             CfDef cfDef = cfInfo.cfDef;
              ByteBuffer key = reader.getCurrentKey();
 -            Map<ByteBuffer, IColumn> cf = reader.getCurrentValue();
 +            Map<ByteBuffer, Column> cf = reader.getCurrentValue();
              assert key != null && cf != null;
  
              // output tuple, will hold the key, each indexed column in a 
tuple, then a bag of the rest
@@@ -237,10 -247,10 +249,10 @@@
                  added.put(cdef.name, true);
              }
              // now add all the other columns
 -            for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
 +            for (Map.Entry<ByteBuffer, Column> entry : cf.entrySet())
              {
                  if (!added.containsKey(entry.getKey()))
-                     bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
+                     bag.add(columnToTuple(entry.getValue(), cfInfo, 
parseType(cfDef.getComparator_type())));
              }
              tuple.append(bag);
              // finally, special top-level indexes if needed

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3ad2e82/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------

Reply via email to