Merge branch 'cassandra-1.2' into cassandra-2.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3ad2e82 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3ad2e82 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3ad2e82 Branch: refs/heads/cassandra-2.0 Commit: a3ad2e82249b88d4a05f24140948cdbc809d14f3 Parents: 8a506e6 639c01a Author: Brandon Williams <brandonwilli...@apache.org> Authored: Fri Oct 11 15:30:27 2013 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Fri Oct 11 15:30:27 2013 -0500 ---------------------------------------------------------------------- .../hadoop/pig/AbstractCassandraStorage.java | 97 ++++++++++++++------ .../cassandra/hadoop/pig/CassandraStorage.java | 55 +++++++---- .../apache/cassandra/hadoop/pig/CqlStorage.java | 8 +- 3 files changed, 109 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3ad2e82/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java index c881734,dbebfb5..486c781 --- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java @@@ -127,14 -128,36 +128,37 @@@ public abstract class AbstractCassandra setTupleValue(pair, 0, cassandraToObj(comparator, col.name())); // value - if (col instanceof Column) + Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); + if (validators.get(col.name()) == null) { - Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef); - setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value())); + // standard + Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); + ByteBuffer colName; + if (cfInfo.cql3Table && !cfInfo.compactCqlTable) + { + ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(col.name()); + colName = names[names.length-1]; + } + else + colName = col.name(); + if (validators.get(colName) == null) + { + Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef); + setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value())); + } + else + setTupleValue(pair, 1, cassandraToObj(validators.get(colName), col.value())); + return pair; } else - setTupleValue(pair, 1, cassandraToObj(validators.get(col.name()), col.value())); + { + // super + ArrayList<Tuple> subcols = new ArrayList<Tuple>(); + for (IColumn subcol : col.getSubColumns()) + subcols.add(columnToTuple(subcol, cfInfo, parseType(cfDef.getSubcomparator_type()))); + + pair.set(1, new DefaultDataBag(subcols)); + } return pair; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3ad2e82/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 4083236,a7cc1ad..d9c55a1 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@@ -124,9 -123,9 +125,9 @@@ public class CassandraStorage extends A key = (ByteBuffer)reader.getCurrentKey(); tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class())); } - for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet()) + for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet()) { - bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); + bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type()))); } lastKey = null; lastRow = null; @@@ -162,9 -161,9 +163,9 @@@ tuple = keyToTuple(lastKey, cfDef, parseType(cfDef.getKey_validation_class())); else addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); - for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet()) + for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet()) { - bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); + bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type()))); } tuple.append(bag); lastKey = key; @@@ -176,19 -175,19 +177,19 @@@ else addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); } - SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); + SortedMap<ByteBuffer, Column> row = (SortedMap<ByteBuffer, Column>)reader.getCurrentValue(); if (lastRow != null) // prepend what was read last time { - for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet()) + for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet()) { - bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); + bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type()))); } lastKey = null; lastRow = null; } - for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet()) + for (Map.Entry<ByteBuffer, Column> entry : row.entrySet()) { - bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); + bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type()))); } } } @@@ -210,9 -209,10 +211,10 @@@ if (!reader.nextKeyValue()) return null; - CfDef cfDef = getCfDef(loadSignature); + CfInfo cfInfo = getCfInfo(loadSignature); + CfDef cfDef = cfInfo.cfDef; ByteBuffer key = reader.getCurrentKey(); - Map<ByteBuffer, IColumn> cf = reader.getCurrentValue(); + Map<ByteBuffer, Column> cf = reader.getCurrentValue(); assert key != null && cf != null; // output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest @@@ -237,10 -247,10 +249,10 @@@ added.put(cdef.name, true); } // now add all the other columns - for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet()) + for (Map.Entry<ByteBuffer, Column> entry : cf.entrySet()) { if (!added.containsKey(entry.getKey())) - bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); + bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type()))); } tuple.append(bag); // finally, special top-level indexes if needed http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3ad2e82/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java ----------------------------------------------------------------------