Merge branch 'cassandra-1.2' into cassandra-2.0

Conflicts:
        src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/678aa37a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/678aa37a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/678aa37a

Branch: refs/heads/trunk
Commit: 678aa37af3322e805a3a639890e18391ce22426f
Parents: 1ff0d8e 70297f9
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Tue Sep 10 13:44:47 2013 -0500
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Tue Sep 10 13:44:47 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../hadoop/pig/AbstractCassandraStorage.java    | 78 +++++++++++++++-----
 .../apache/cassandra/hadoop/pig/CqlStorage.java | 78 +++++++++++++++++++-
 3 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 22fa74b,4d5b446..f9a3b80
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -25,54 -9,14 +25,55 @@@ Merged from 1.2
   * Fix streaming does not transfer wrapped range (CASSANDRA-5948)
   * Fix loading index summary containing empty key (CASSANDRA-5965)
   * Correctly handle limits in CompositesSearcher (CASSANDRA-5975)
+  * Pig: handle CQL collections (CASSANDRA-5867)
  
  
 -1.2.9
 +2.0.0
 + * Fix thrift validation when inserting into CQL3 tables (CASSANDRA-5138)
 + * Fix periodic memtable flushing behavior with clean memtables 
(CASSANDRA-5931)
 + * Fix dateOf() function for pre-2.0 timestamp columns (CASSANDRA-5928)
 + * Fix SSTable unintentionally loads BF when opened for batch (CASSANDRA-5938)
 + * Add stream session progress to JMX (CASSANDRA-4757)
 + * Fix NPE during CAS operation (CASSANDRA-5925)
 +Merged from 1.2:
   * Fix getBloomFilterDiskSpaceUsed for AlwaysPresentFilter (CASSANDRA-5900)
 - * migrate 1.1 schema_columnfamilies.key_alias column to key_aliases
 -   (CASSANDRA-5800)
 - * add --migrate option to sstableupgrade and sstablescrub (CASSANDRA-5831)
 + * Don't announce schema version until we've loaded the changes locally
 +   (CASSANDRA-5904)
 + * Fix to support off heap bloom filters size greater than 2 GB 
(CASSANDRA-5903)
 + * Properly handle parsing huge map and set literals (CASSANDRA-5893)
 +
 +
 +2.0.0-rc2
 + * enable vnodes by default (CASSANDRA-5869)
 + * fix CAS contention timeout (CASSANDRA-5830)
 + * fix HsHa to respect max frame size (CASSANDRA-4573)
 + * Fix (some) 2i on composite components omissions (CASSANDRA-5851)
 + * cqlsh: add DESCRIBE FULL SCHEMA variant (CASSANDRA-5880)
 +Merged from 1.2:
 + * Correctly validate sparse composite cells in scrub (CASSANDRA-5855)
 + * Add KeyCacheHitRate metric to CF metrics (CASSANDRA-5868)
 + * cqlsh: add support for multiline comments (CASSANDRA-5798)
 + * Handle CQL3 SELECT duplicate IN restrictions on clustering columns
 +   (CASSANDRA-5856)
 +
 +
 +2.0.0-rc1
 + * improve DecimalSerializer performance (CASSANDRA-5837)
 + * fix potential spurious wakeup in AsyncOneResponse (CASSANDRA-5690)
 + * fix schema-related trigger issues (CASSANDRA-5774)
 + * Better validation when accessing CQL3 table from thrift (CASSANDRA-5138)
 + * Fix assertion error during repair (CASSANDRA-5801)
 + * Fix range tombstone bug (CASSANDRA-5805)
 + * DC-local CAS (CASSANDRA-5797)
 + * Add a native_protocol_version column to the system.local table 
(CASSANRDA-5819)
 + * Use index_interval from cassandra.yaml when upgraded (CASSANDRA-5822)
 + * Fix buffer underflow on socket close (CASSANDRA-5792)
 +Merged from 1.2:
 + * Fix reading DeletionTime from 1.1-format sstables (CASSANDRA-5814)
 + * cqlsh: add collections support to COPY (CASSANDRA-5698)
 + * retry important messages for any IOException (CASSANDRA-5804)
 + * Allow empty IN relations in SELECT/UPDATE/DELETE statements 
(CASSANDRA-5626)
 + * cqlsh: fix crashing on Windows due to libedit detection (CASSANDRA-5812)
   * fix bulk-loading compressed sstables (CASSANDRA-5820)
   * (Hadoop) fix quoting in CqlPagingRecordReader and CqlRecordWriter 
     (CASSANDRA-5824)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 84d7a7a,a73e5a5..2b76b83
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@@ -105,11 -107,12 +106,12 @@@ public class CqlStorage extends Abstrac
                  ByteBuffer columnValue = 
columns.get(ByteBufferUtil.string(cdef.name.duplicate()));
                  if (columnValue != null)
                  {
 -                    IColumn column = new Column(cdef.name, columnValue);
 +                    Column column = new Column(cdef.name, columnValue);
-                     tuple.set(i, columnToTuple(column, cfDef, 
UTF8Type.instance));
+                     AbstractType<?> validator = 
getValidatorMap(cfDef).get(column.name());
+                     setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), 
validator);
                  }
                  else
-                     tuple.set(i, TupleFactory.getInstance().newTuple());
+                     tuple.set(i, null);
                  i++;
              }
              return tuple;
@@@ -120,6 -123,74 +122,74 @@@
          }
      }
  
+     /** set the value to the position of the tuple */
+     private void setTupleValue(Tuple tuple, int position, Object value, 
AbstractType<?> validator) throws ExecException
+     {
+         if (validator instanceof CollectionType)
+             setCollectionTupleValues(tuple, position, value, validator);
+         else
+            setTupleValue(tuple, position, value);
+     }
+ 
+     /** set the values of set/list at and after the position of the tuple */
+     private void setCollectionTupleValues(Tuple tuple, int position, Object 
value, AbstractType<?> validator) throws ExecException
+     {
+         if (validator instanceof MapType)
+         {
+             setMapTupleValues(tuple, position, value, validator);
+             return;
+         }
+         AbstractType<?> elementValidator;
+         if (validator instanceof SetType)
+             elementValidator = ((SetType<?>) validator).elements;
+         else if (validator instanceof ListType)
+             elementValidator = ((ListType<?>) validator).elements;
+         else 
+             return;
+         
+         int i = 0;
+         Tuple innerTuple = 
TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
+         for (Object entry : (Collection<?>) value)
+         {
+             setTupleValue(innerTuple, i, entry, elementValidator);
+             i++;
+         }
+         tuple.set(position, innerTuple);
+     }
+ 
+     /** set the values of set/list at and after the position of the tuple */
+     private void setMapTupleValues(Tuple tuple, int position, Object value, 
AbstractType<?> validator) throws ExecException
+     {
+         AbstractType<?> keyValidator = ((MapType<?, ?>) validator).keys;
+         AbstractType<?> valueValidator = ((MapType<?, ?>) validator).values;
+         
+         int i = 0;
+         Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) 
value).size());
+         for(Map.Entry<?,?> entry :  ((Map<Object, Object>)value).entrySet())
+         {
+             Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
+             setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator);
+             setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator);
+             innerTuple.set(i, mapEntryTuple);
+             i++;
+         }
+         tuple.set(position, innerTuple);
+     }
+ 
+     /** convert a cql column to an object */
 -    private Object cqlColumnToObj(IColumn col, CfDef cfDef) throws IOException
++    private Object cqlColumnToObj(Column col, CfDef cfDef) throws IOException
+     {
+         // standard
+         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+         if (validators.get(col.name()) == null)
+         {
+             Map<MarshallerType, AbstractType> marshallers = 
getDefaultMarshallers(cfDef);
+             return 
marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value());
+         }
+         else
+             return validators.get(col.name()).compose(col.value());
+     }
+ 
      /** set read configuration settings */
      public void setLocation(String location, Job job) throws IOException
      {
@@@ -455,8 -524,10 +525,8 @@@
      private String partitionFilterToWhereClauseString(Expression expression)
      {
          Expression.BinaryExpression be = (Expression.BinaryExpression) 
expression;
 -        String name = be.getLhs().toString();
 -        String value = be.getRhs().toString();
          OpType op = expression.getOpType();
-         String opString = op.name();
+         String opString = op.toString();
          switch (op)
          {
              case OP_EQ:

Reply via email to