Author: brandonwilliams
Date: Fri Oct 14 21:57:29 2011
New Revision: 1183518

URL: http://svn.apache.org/viewvc?rev=1183518&view=rev
Log:
Add 0.8+ types and key validation type to pig schema.
Patch by Steeve Morin, reviewed by brandonwilliams for CASSANDRA-3280

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    
cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1183518&r1=1183517&r2=1183518&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Fri Oct 14 21:57:29 2011
@@ -15,6 +15,8 @@
  * (Hadoop) accept comma delimited lists of initial thrift connections
    (CASSANDRA-3185)
  * ColumnFamily min_compaction_threshold should be >= 2 (CASSANDRA-3342)
+ * (Pig) add 0.8+ types and key validation type in schema (CASSANDRA-3280)
+
 
 0.8.7
  * Kill server on wrapped OOME such as from FileChannel.map (CASSANDRA-3201)

Modified: 
cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1183518&r1=1183517&r2=1183518&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
 Fri Oct 14 21:57:29 2011
@@ -107,7 +107,7 @@ public class CassandraStorage extends Lo
         return limit;
     }
 
-       @Override
+    @Override
     public Tuple getNext() throws IOException
     {
         try
@@ -122,7 +122,7 @@ public class CassandraStorage extends Lo
             assert key != null && cf != null;
             
             // and wrap it in a tuple
-               Tuple tuple = TupleFactory.getInstance().newTuple(2);
+            Tuple tuple = TupleFactory.getInstance().newTuple(2);
             ArrayList<Tuple> columns = new ArrayList<Tuple>();
             tuple.set(0, new DataByteArray(key.array(), 
key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
             for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
@@ -187,10 +187,12 @@ public class CassandraStorage extends Lo
         ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>();
         AbstractType comparator = null;
         AbstractType default_validator = null;
+        AbstractType key_validator = null;
         try
         {
-            comparator = TypeParser.parse(cfDef.comparator_type);
-            default_validator = 
TypeParser.parse(cfDef.default_validation_class);
+            comparator = TypeParser.parse(cfDef.getComparator_type());
+            default_validator = 
TypeParser.parse(cfDef.getDefault_validation_class());
+            key_validator = TypeParser.parse(cfDef.getKey_validation_class());
         }
         catch (ConfigurationException e)
         {
@@ -199,13 +201,14 @@ public class CassandraStorage extends Lo
 
         marshallers.add(comparator);
         marshallers.add(default_validator);
+        marshallers.add(key_validator);
         return marshallers;
     }
 
-    private Map<ByteBuffer,AbstractType> getValidatorMap(CfDef cfDef) throws  
IOException
+    private Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws 
IOException
     {
         Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, 
AbstractType>();
-        for (ColumnDef cd : cfDef.column_metadata)
+        for (ColumnDef cd : cfDef.getColumn_metadata())
         {
             if (cd.getValidation_class() != null && 
!cd.getValidation_class().isEmpty())
             {
@@ -236,6 +239,18 @@ public class CassandraStorage extends Lo
         this.reader = reader;
     }
 
+    public static Map<String, String> getQueryMap(String query)
+    {
+        String[] params = query.split("&");
+        Map<String, String> map = new HashMap<String, String>();
+        for (String param : params)
+        {
+            String[] keyValue = param.split("=");
+            map.put(keyValue[0], keyValue[1]);
+        }
+        return map;
+    }
+
     private void setLocationFromUri(String location) throws IOException
     {
         // parse uri into keyspace and columnfamily
@@ -247,18 +262,18 @@ public class CassandraStorage extends Lo
             String[] urlParts = location.split("\\?");
             if (urlParts.length > 1)
             {
-                for (String param : urlParts[1].split("&"))
-                {
-                    String[] pair = param.split("=");
-                    if (pair[0].equals("slice_start"))
-                        slice_start = ByteBufferUtil.bytes(pair[1]);
-                    else if (pair[0].equals("slice_end"))
-                        slice_end = ByteBufferUtil.bytes(pair[1]);
-                    else if (pair[0].equals("reversed"))
-                        slice_reverse = Boolean.parseBoolean(pair[1]);
-                    else if (pair[0].equals("limit"))
-                        limit = Integer.parseInt(pair[1]);
-                }
+                Map<String, String> urlQuery = getQueryMap(urlParts[1]);
+                AbstractType comparator = BytesType.instance;
+                if (urlQuery.containsKey("comparator"))
+                    comparator = TypeParser.parse(urlQuery.get("comparator"));
+                if (urlQuery.containsKey("slice_start"))
+                    slice_start = 
comparator.fromString(urlQuery.get("slice_start"));
+                if (urlQuery.containsKey("slice_end"))
+                    slice_end = 
comparator.fromString(urlQuery.get("slice_end"));
+                if (urlQuery.containsKey("reversed"))
+                    slice_reverse = 
Boolean.parseBoolean(urlQuery.get("reversed"));
+                if (urlQuery.containsKey("limit"))
+                    limit = Integer.parseInt(urlQuery.get("limit"));
             }
             String[] parts = urlParts[0].split("/+");
             keyspace = parts[1];
@@ -312,10 +327,14 @@ public class CassandraStorage extends Lo
         // top-level schema, no type
         ResourceSchema schema = new ResourceSchema();
 
+        // get default marshallers and validators
+        List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
+        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+
         // add key
         ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
         keyFieldSchema.setName("key");
-        keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type
+        keyFieldSchema.setType(getPigType(marshallers.get(2)));
 
         // will become the bag of tuples
         ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema();
@@ -323,9 +342,6 @@ public class CassandraStorage extends Lo
         bagFieldSchema.setType(DataType.BAG);
         ResourceSchema bagSchema = new ResourceSchema();
 
-
-        List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
-        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
         List<ResourceFieldSchema> tupleFields = new 
ArrayList<ResourceFieldSchema>();
 
         // default comparator/validator
@@ -381,6 +397,10 @@ public class CassandraStorage extends Lo
             return DataType.CHARARRAY;
         else if (type instanceof UTF8Type)
             return DataType.CHARARRAY;
+        else if (type instanceof FloatType)
+            return DataType.FLOAT;
+        else if (type instanceof DoubleType)
+            return DataType.DOUBLE;
         return DataType.BYTEARRAY;
     }
 


Reply via email to