Merge branch 'cassandra-1.2' into cassandra-2.0 Conflicts: src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/678aa37a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/678aa37a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/678aa37a Branch: refs/heads/trunk Commit: 678aa37af3322e805a3a639890e18391ce22426f Parents: 1ff0d8e 70297f9 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Tue Sep 10 13:44:47 2013 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Tue Sep 10 13:44:47 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../hadoop/pig/AbstractCassandraStorage.java | 78 +++++++++++++++----- .../apache/cassandra/hadoop/pig/CqlStorage.java | 78 +++++++++++++++++++- 3 files changed, 135 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 22fa74b,4d5b446..f9a3b80 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -25,54 -9,14 +25,55 @@@ Merged from 1.2 * Fix streaming does not transfer wrapped range (CASSANDRA-5948) * Fix loading index summary containing empty key (CASSANDRA-5965) * Correctly handle limits in CompositesSearcher (CASSANDRA-5975) + * Pig: handle CQL collections (CASSANDRA-5867) -1.2.9 +2.0.0 + * Fix thrift validation when inserting into CQL3 tables (CASSANDRA-5138) + * Fix periodic memtable flushing behavior with clean memtables (CASSANDRA-5931) + * Fix dateOf() function for pre-2.0 timestamp columns (CASSANDRA-5928) + * Fix SSTable unintentionally loads BF when opened for batch (CASSANDRA-5938) + * Add stream session progress to JMX (CASSANDRA-4757) + * Fix NPE during CAS operation (CASSANDRA-5925) +Merged from 1.2: * Fix getBloomFilterDiskSpaceUsed for AlwaysPresentFilter (CASSANDRA-5900) - * migrate 1.1 schema_columnfamilies.key_alias column to key_aliases - (CASSANDRA-5800) - * add --migrate option to sstableupgrade and sstablescrub (CASSANDRA-5831) + * Don't announce schema version until we've loaded the changes locally + (CASSANDRA-5904) + * Fix to support off heap bloom filters size greater than 2 GB (CASSANDRA-5903) + * Properly handle parsing huge map and set literals (CASSANDRA-5893) + + +2.0.0-rc2 + * enable vnodes by default (CASSANDRA-5869) + * fix CAS contention timeout (CASSANDRA-5830) + * fix HsHa to respect max frame size (CASSANDRA-4573) + * Fix (some) 2i on composite components omissions (CASSANDRA-5851) + * cqlsh: add DESCRIBE FULL SCHEMA variant (CASSANDRA-5880) +Merged from 1.2: + * Correctly validate sparse composite cells in scrub (CASSANDRA-5855) + * Add KeyCacheHitRate metric to CF metrics (CASSANDRA-5868) + * cqlsh: add support for multiline comments (CASSANDRA-5798) + * Handle CQL3 SELECT duplicate IN restrictions on clustering columns + (CASSANDRA-5856) + + +2.0.0-rc1 + * improve DecimalSerializer performance (CASSANDRA-5837) + * fix potential spurious wakeup in AsyncOneResponse (CASSANDRA-5690) + * fix schema-related trigger issues (CASSANDRA-5774) + * Better validation when accessing CQL3 table from thrift (CASSANDRA-5138) + * Fix assertion error during repair (CASSANDRA-5801) + * Fix range tombstone bug (CASSANDRA-5805) + * DC-local CAS (CASSANDRA-5797) + * Add a native_protocol_version column to the system.local table (CASSANRDA-5819) + * Use index_interval from cassandra.yaml when upgraded (CASSANDRA-5822) + * Fix buffer underflow on socket close (CASSANDRA-5792) +Merged from 1.2: + * Fix reading DeletionTime from 1.1-format sstables (CASSANDRA-5814) + * cqlsh: add collections support to COPY (CASSANDRA-5698) + * retry important messages for any IOException (CASSANDRA-5804) + * Allow empty IN relations in SELECT/UPDATE/DELETE statements (CASSANDRA-5626) + * cqlsh: fix crashing on Windows due to libedit detection (CASSANDRA-5812) * fix bulk-loading compressed sstables (CASSANDRA-5820) * (Hadoop) fix quoting in CqlPagingRecordReader and CqlRecordWriter (CASSANDRA-5824) http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java index 84d7a7a,a73e5a5..2b76b83 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java @@@ -105,11 -107,12 +106,12 @@@ public class CqlStorage extends Abstrac ByteBuffer columnValue = columns.get(ByteBufferUtil.string(cdef.name.duplicate())); if (columnValue != null) { - IColumn column = new Column(cdef.name, columnValue); + Column column = new Column(cdef.name, columnValue); - tuple.set(i, columnToTuple(column, cfDef, UTF8Type.instance)); + AbstractType<?> validator = getValidatorMap(cfDef).get(column.name()); + setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), validator); } else - tuple.set(i, TupleFactory.getInstance().newTuple()); + tuple.set(i, null); i++; } return tuple; @@@ -120,6 -123,74 +122,74 @@@ } } + /** set the value to the position of the tuple */ + private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException + { + if (validator instanceof CollectionType) + setCollectionTupleValues(tuple, position, value, validator); + else + setTupleValue(tuple, position, value); + } + + /** set the values of set/list at and after the position of the tuple */ + private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException + { + if (validator instanceof MapType) + { + setMapTupleValues(tuple, position, value, validator); + return; + } + AbstractType<?> elementValidator; + if (validator instanceof SetType) + elementValidator = ((SetType<?>) validator).elements; + else if (validator instanceof ListType) + elementValidator = ((ListType<?>) validator).elements; + else + return; + + int i = 0; + Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size()); + for (Object entry : (Collection<?>) value) + { + setTupleValue(innerTuple, i, entry, elementValidator); + i++; + } + tuple.set(position, innerTuple); + } + + /** set the values of set/list at and after the position of the tuple */ + private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException + { + AbstractType<?> keyValidator = ((MapType<?, ?>) validator).keys; + AbstractType<?> valueValidator = ((MapType<?, ?>) validator).values; + + int i = 0; + Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size()); + for(Map.Entry<?,?> entry : ((Map<Object, Object>)value).entrySet()) + { + Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2); + setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator); + setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator); + innerTuple.set(i, mapEntryTuple); + i++; + } + tuple.set(position, innerTuple); + } + + /** convert a cql column to an object */ - private Object cqlColumnToObj(IColumn col, CfDef cfDef) throws IOException ++ private Object cqlColumnToObj(Column col, CfDef cfDef) throws IOException + { + // standard + Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); + if (validators.get(col.name()) == null) + { + Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef); + return marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value()); + } + else + return validators.get(col.name()).compose(col.value()); + } + /** set read configuration settings */ public void setLocation(String location, Job job) throws IOException { @@@ -455,8 -524,10 +525,8 @@@ private String partitionFilterToWhereClauseString(Expression expression) { Expression.BinaryExpression be = (Expression.BinaryExpression) expression; - String name = be.getLhs().toString(); - String value = be.getRhs().toString(); OpType op = expression.getOpType(); - String opString = op.name(); + String opString = op.toString(); switch (op) { case OP_EQ: