http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java 
b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 0a64c87..1ad80b7 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -18,30 +18,46 @@
 package org.apache.cassandra.hadoop.pig;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
-import org.apache.cassandra.hadoop.HadoopCompat;
-import org.apache.cassandra.db.Cell;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.auth.PasswordAuthenticator;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.HadoopCompat;
+import org.apache.cassandra.schema.LegacySchemaTables;
+import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
 import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.*;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
@@ -52,7 +68,8 @@ import org.apache.thrift.protocol.TBinaryProtocol;
  *
  * A row from a standard CF will be returned as nested tuples: (key, ((name1, 
val1), (name2, val2))).
  */
-public class CassandraStorage extends AbstractCassandraStorage
+@Deprecated
+public class CassandraStorage extends LoadFunc implements StoreFuncInterface, 
LoadMetadata
 {
     public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES";
     public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT";
@@ -71,6 +88,28 @@ public class CassandraStorage extends 
AbstractCassandraStorage
 
     private boolean widerows = false;
     private int limit;
+
+    protected String DEFAULT_INPUT_FORMAT;
+    protected String DEFAULT_OUTPUT_FORMAT;
+
+    protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, 
KEY_VALIDATOR, SUBCOMPARATOR };
+
+    protected String username;
+    protected String password;
+    protected String keyspace;
+    protected String column_family;
+    protected String loadSignature;
+    protected String storeSignature;
+
+    protected Configuration conf;
+    protected String inputFormatClass;
+    protected String outputFormatClass;
+    protected int splitSize = 64 * 1024;
+    protected String partitionerClass;
+    protected boolean usePartitionFilter = false;
+    protected String initHostAddress;
+    protected String rpcPort;
+    protected int nativeProtocolVersion = 1;
     
     // wide row hacks
     private ByteBuffer lastKey;
@@ -104,8 +143,7 @@ public class CassandraStorage extends 
AbstractCassandraStorage
     /** read wide row*/
     public Tuple getNextWide() throws IOException
     {
-        CfInfo cfInfo = getCfInfo(loadSignature);
-        CfDef cfDef = cfInfo.cfDef;
+        CfDef cfDef = getCfDef(loadSignature);
         ByteBuffer key = null;
         Tuple tuple = null; 
         DefaultDataBag bag = new DefaultDataBag();
@@ -128,7 +166,7 @@ public class CassandraStorage extends 
AbstractCassandraStorage
                         }
                         for (Map.Entry<ByteBuffer, Cell> entry : 
lastRow.entrySet())
                         {
-                            bag.add(columnToTuple(entry.getValue(), cfInfo, 
parseType(cfDef.getComparator_type())));
+                            bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
                         }
                         lastKey = null;
                         lastRow = null;
@@ -166,7 +204,7 @@ public class CassandraStorage extends 
AbstractCassandraStorage
                             addKeyToTuple(tuple, lastKey, cfDef, 
parseType(cfDef.getKey_validation_class()));
                         for (Map.Entry<ByteBuffer, Cell> entry : 
lastRow.entrySet())
                         {
-                            bag.add(columnToTuple(entry.getValue(), cfInfo, 
parseType(cfDef.getComparator_type())));
+                            bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
                         }
                         tuple.append(bag);
                         lastKey = key;
@@ -183,14 +221,14 @@ public class CassandraStorage extends 
AbstractCassandraStorage
                 {
                     for (Map.Entry<ByteBuffer, Cell> entry : 
lastRow.entrySet())
                     {
-                        bag.add(columnToTuple(entry.getValue(), cfInfo, 
parseType(cfDef.getComparator_type())));
+                        bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
                     }
                     lastKey = null;
                     lastRow = null;
                 }
                 for (Map.Entry<ByteBuffer, Cell> entry : row.entrySet())
                 {
-                    bag.add(columnToTuple(entry.getValue(), cfInfo, 
parseType(cfDef.getComparator_type())));
+                    bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
                 }
             }
         }
@@ -200,7 +238,6 @@ public class CassandraStorage extends 
AbstractCassandraStorage
         }
     }
 
-    @Override
     /** read next row */
     public Tuple getNext() throws IOException
     {
@@ -212,8 +249,7 @@ public class CassandraStorage extends 
AbstractCassandraStorage
             if (!reader.nextKeyValue())
                 return null;
 
-            CfInfo cfInfo = getCfInfo(loadSignature);
-            CfDef cfDef = cfInfo.cfDef;
+            CfDef cfDef = getCfDef(loadSignature);
             ByteBuffer key = reader.getCurrentKey();
             Map<ByteBuffer, Cell> cf = reader.getCurrentValue();
             assert key != null && cf != null;
@@ -240,7 +276,7 @@ public class CassandraStorage extends 
AbstractCassandraStorage
                 }
                 if (hasColumn)
                 {
-                    tuple.append(columnToTuple(cf.get(cdef.name), cfInfo, 
parseType(cfDef.getComparator_type())));
+                    tuple.append(columnToTuple(cf.get(cdef.name), cfDef, 
parseType(cfDef.getComparator_type())));
                 }
                 else if (!cql3Table)
                 {   // otherwise, we need to add an empty tuple to take its 
place
@@ -252,7 +288,7 @@ public class CassandraStorage extends 
AbstractCassandraStorage
             for (Map.Entry<ByteBuffer, Cell> entry : cf.entrySet())
             {
                 if (!added.containsKey(entry.getKey()))
-                    bag.add(columnToTuple(entry.getValue(), cfInfo, 
parseType(cfDef.getComparator_type())));
+                    bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
             }
             tuple.append(bag);
             // finally, special top-level indexes if needed
@@ -260,7 +296,7 @@ public class CassandraStorage extends 
AbstractCassandraStorage
             {
                 for (ColumnDef cdef : getIndexes())
                 {
-                    Tuple throwaway = columnToTuple(cf.get(cdef.name), cfInfo, 
parseType(cfDef.getComparator_type()));
+                    Tuple throwaway = columnToTuple(cf.get(cdef.name), cfDef, 
parseType(cfDef.getComparator_type()));
                     tuple.append(throwaway.get(1));
                 }
             }
@@ -272,14 +308,57 @@ public class CassandraStorage extends 
AbstractCassandraStorage
         }
     }
 
+    /** write next row */
+    public void putNext(Tuple t) throws IOException
+    {
+        /*
+        We support two cases for output:
+        First, the original output:
+            (key, (name, value), (name,value), {(name,value)}) (tuples or bag 
is optional)
+        For supers, we only accept the original output.
+        */
+
+        if (t.size() < 1)
+        {
+            // simply nothing here, we can't even delete without a key
+            logger.warn("Empty output skipped, filter empty tuples to suppress 
this warning");
+            return;
+        }
+        ByteBuffer key = objToBB(t.get(0));
+        if (t.getType(1) == DataType.TUPLE)
+            writeColumnsFromTuple(key, t, 1);
+        else if (t.getType(1) == DataType.BAG)
+        {
+            if (t.size() > 2)
+                throw new IOException("No arguments allowed after bag");
+            writeColumnsFromBag(key, (DataBag) t.get(1));
+        }
+        else
+            throw new IOException("Second argument in output must be a tuple 
or bag");
+    }
+
     /** set hadoop cassandra connection settings */
     protected void setConnectionInformation() throws IOException
     {
-        super.setConnectionInformation();
+        StorageHelper.setConnectionInformation(conf);
+        if (System.getenv(StorageHelper.PIG_INPUT_FORMAT) != null)
+            inputFormatClass = 
getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_INPUT_FORMAT));
+        else
+            inputFormatClass = DEFAULT_INPUT_FORMAT;
+        if (System.getenv(StorageHelper.PIG_OUTPUT_FORMAT) != null)
+            outputFormatClass = 
getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_OUTPUT_FORMAT));
+        else
+            outputFormatClass = DEFAULT_OUTPUT_FORMAT;
         if (System.getenv(PIG_ALLOW_DELETES) != null)
             allow_deletes = 
Boolean.parseBoolean(System.getenv(PIG_ALLOW_DELETES));
     }
 
+    /** get the full class name */
+    protected String getFullyQualifiedClassName(String classname)
+    {
+        return classname.contains(".") ? classname : 
"org.apache.cassandra.hadoop." + classname;
+    }
+
     /** set read configuration settings */
     public void setLocation(String location, Job job) throws IOException
     {
@@ -296,11 +375,11 @@ public class CassandraStorage extends 
AbstractCassandraStorage
             widerows = Boolean.parseBoolean(System.getenv(PIG_WIDEROW_INPUT));
         if (System.getenv(PIG_USE_SECONDARY) != null)
             usePartitionFilter = 
Boolean.parseBoolean(System.getenv(PIG_USE_SECONDARY));
-        if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
+        if (System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE) != null)
         {
             try
             {
-                ConfigHelper.setInputSplitSize(conf, 
Integer.parseInt(System.getenv(PIG_INPUT_SPLIT_SIZE)));
+                ConfigHelper.setInputSplitSize(conf, 
Integer.parseInt(System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE)));
             }
             catch (NumberFormatException e)
             {
@@ -380,12 +459,67 @@ public class CassandraStorage extends 
AbstractCassandraStorage
         initSchema(storeSignature);
     }
 
+    /** Methods to get the column family schema from Cassandra */
+    protected void initSchema(String signature) throws IOException
+    {
+        Properties properties = 
UDFContext.getUDFContext().getUDFProperties(CassandraStorage.class);
+
+        // Only get the schema if we haven't already gotten it
+        if (!properties.containsKey(signature))
+        {
+            try
+            {
+                Cassandra.Client client = 
ConfigHelper.getClientFromInputAddressList(conf);
+                client.set_keyspace(keyspace);
+
+                if (username != null && password != null)
+                {
+                    Map<String, String> credentials = new HashMap<String, 
String>(2);
+                    credentials.put(PasswordAuthenticator.USERNAME_KEY, 
username);
+                    credentials.put(PasswordAuthenticator.PASSWORD_KEY, 
password);
+
+                    try
+                    {
+                        client.login(new AuthenticationRequest(credentials));
+                    }
+                    catch (AuthenticationException e)
+                    {
+                        logger.error("Authentication exception: invalid 
username and/or password");
+                        throw new IOException(e);
+                    }
+                }
+
+                // compose the CfDef for the columfamily
+                CfDef cfDef = getCfDef(client);
+
+                if (cfDef != null)
+                {
+                    StringBuilder sb = new StringBuilder();
+                    sb.append(cfdefToString(cfDef));
+                    properties.setProperty(signature, sb.toString());
+                }
+                else
+                    throw new IOException(String.format("Table '%s' not found 
in keyspace '%s'",
+                            column_family,
+                            keyspace));
+            }
+            catch (Exception e)
+            {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    public void checkSchema(ResourceSchema schema) throws IOException
+    {
+        // we don't care about types, they all get casted to ByteBuffers
+    }
+
     /** define the schema */
     public ResourceSchema getSchema(String location, Job job) throws 
IOException
     {
         setLocation(location, job);
-        CfInfo cfInfo = getCfInfo(loadSignature);
-        CfDef cfDef = cfInfo.cfDef;
+        CfDef cfDef = getCfDef(loadSignature);
         if (cfDef.column_type.equals("Super"))
             return null;
         /*
@@ -405,7 +539,7 @@ public class CassandraStorage extends 
AbstractCassandraStorage
         // add key
         ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
         keyFieldSchema.setName("key");
-        
keyFieldSchema.setType(getPigType(marshallers.get(MarshallerType.KEY_VALIDATOR)));
+        
keyFieldSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.KEY_VALIDATOR)));
 
         ResourceSchema bagSchema = new ResourceSchema();
         ResourceFieldSchema bagField = new ResourceFieldSchema();
@@ -419,8 +553,8 @@ public class CassandraStorage extends 
AbstractCassandraStorage
         ResourceFieldSchema bagvalSchema = new ResourceFieldSchema();
         bagcolSchema.setName("name");
         bagvalSchema.setName("value");
-        
bagcolSchema.setType(getPigType(marshallers.get(MarshallerType.COMPARATOR)));
-        
bagvalSchema.setType(getPigType(marshallers.get(MarshallerType.DEFAULT_VALIDATOR)));
+        
bagcolSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.COMPARATOR)));
+        
bagvalSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.DEFAULT_VALIDATOR)));
         bagTupleSchema.setFields(new ResourceFieldSchema[] { bagcolSchema, 
bagvalSchema });
         bagTupleField.setSchema(bagTupleSchema);
         bagSchema.setFields(new ResourceFieldSchema[] { bagTupleField });
@@ -431,7 +565,7 @@ public class CassandraStorage extends 
AbstractCassandraStorage
         // add the key first, then the indexed columns, and finally the bag
         allSchemaFields.add(keyFieldSchema);
 
-        if (!widerows && (cfInfo.compactCqlTable || !cfInfo.cql3Table))
+        if (!widerows)
         {
             // defined validators/indexes
             for (ColumnDef cdef : cfDef.column_metadata)
@@ -445,14 +579,14 @@ public class CassandraStorage extends 
AbstractCassandraStorage
 
                 ResourceFieldSchema idxColSchema = new ResourceFieldSchema();
                 idxColSchema.setName("name");
-                
idxColSchema.setType(getPigType(marshallers.get(MarshallerType.COMPARATOR)));
+                
idxColSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.COMPARATOR)));
 
                 ResourceFieldSchema valSchema = new ResourceFieldSchema();
                 AbstractType validator = validators.get(cdef.name);
                 if (validator == null)
                     validator = 
marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
                 valSchema.setName("value");
-                valSchema.setType(getPigType(validator));
+                valSchema.setType(StorageHelper.getPigType(validator));
 
                 innerTupleSchema.setFields(new ResourceFieldSchema[] { 
idxColSchema, valSchema });
                 allSchemaFields.add(innerTupleField);
@@ -472,7 +606,7 @@ public class CassandraStorage extends 
AbstractCassandraStorage
                 AbstractType validator = validators.get(cdef.name);
                 if (validator == null)
                     validator = 
marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
-                idxSchema.setType(getPigType(validator));
+                idxSchema.setType(StorageHelper.getPigType(validator));
                 allSchemaFields.add(idxSchema);
             }
         }
@@ -485,8 +619,8 @@ public class CassandraStorage extends 
AbstractCassandraStorage
     public void setPartitionFilter(Expression partitionFilter) throws 
IOException
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = 
context.getUDFProperties(AbstractCassandraStorage.class);
-        property.setProperty(PARTITION_FILTER_SIGNATURE, 
indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
+        Properties property = context.getUDFProperties(CassandraStorage.class);
+        property.setProperty(StorageHelper.PARTITION_FILTER_SIGNATURE, 
indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
     }
 
     /** prepare writer */
@@ -495,33 +629,93 @@ public class CassandraStorage extends 
AbstractCassandraStorage
         this.writer = writer;
     }
 
-    /** write next row */
-    public void putNext(Tuple t) throws IOException
+    /** convert object to ByteBuffer */
+    protected ByteBuffer objToBB(Object o)
     {
-        /*
-        We support two cases for output:
-        First, the original output:
-            (key, (name, value), (name,value), {(name,value)}) (tuples or bag 
is optional)
-        For supers, we only accept the original output.
-        */
+        if (o == null)
+            return nullToBB();
+        if (o instanceof java.lang.String)
+            return ByteBuffer.wrap(new DataByteArray((String)o).get());
+        if (o instanceof Integer)
+            return Int32Type.instance.decompose((Integer)o);
+        if (o instanceof Long)
+            return LongType.instance.decompose((Long)o);
+        if (o instanceof Float)
+            return FloatType.instance.decompose((Float)o);
+        if (o instanceof Double)
+            return DoubleType.instance.decompose((Double)o);
+        if (o instanceof UUID)
+            return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
+        if(o instanceof Tuple) {
+            List<Object> objects = ((Tuple)o).getAll();
+            //collections
+            if (objects.size() > 0 && objects.get(0) instanceof String)
+            {
+                String collectionType = (String) objects.get(0);
+                if ("set".equalsIgnoreCase(collectionType) ||
+                        "list".equalsIgnoreCase(collectionType))
+                    return objToListOrSetBB(objects.subList(1, 
objects.size()));
+                else if ("map".equalsIgnoreCase(collectionType))
+                    return objToMapBB(objects.subList(1, objects.size()));
 
-        if (t.size() < 1)
+            }
+            return objToCompositeBB(objects);
+        }
+
+        return ByteBuffer.wrap(((DataByteArray) o).get());
+    }
+
+    private ByteBuffer objToListOrSetBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new 
ArrayList<ByteBuffer>(objects.size());
+        for(Object sub : objects)
         {
-            // simply nothing here, we can't even delete without a key
-            logger.warn("Empty output skipped, filter empty tuples to suppress 
this warning");
-            return;
+            ByteBuffer buffer = objToBB(sub);
+            serialized.add(buffer);
         }
-        ByteBuffer key = objToBB(t.get(0));
-        if (t.getType(1) == DataType.TUPLE)
-            writeColumnsFromTuple(key, t, 1);
-        else if (t.getType(1) == DataType.BAG)
+        // NOTE: using protocol v1 serialization format for collections so as 
to not break
+        // compatibility. Not sure if that's the right thing.
+        return CollectionSerializer.pack(serialized, objects.size(), 1);
+    }
+
+    private ByteBuffer objToMapBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() 
* 2);
+        for(Object sub : objects)
         {
-            if (t.size() > 2)
-                throw new IOException("No arguments allowed after bag");
-            writeColumnsFromBag(key, (DataBag) t.get(1));
+            List<Object> keyValue = ((Tuple)sub).getAll();
+            for (Object entry: keyValue)
+            {
+                ByteBuffer buffer = objToBB(entry);
+                serialized.add(buffer);
+            }
         }
-        else
-            throw new IOException("Second argument in output must be a tuple 
or bag");
+        // NOTE: using protocol v1 serialization format for collections so as 
to not break
+        // compatibility. Not sure if that's the right thing.
+        return CollectionSerializer.pack(serialized, objects.size(), 1);
+    }
+
+    private ByteBuffer objToCompositeBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new 
ArrayList<ByteBuffer>(objects.size());
+        int totalLength = 0;
+        for(Object sub : objects)
+        {
+            ByteBuffer buffer = objToBB(sub);
+            serialized.add(buffer);
+            totalLength += 2 + buffer.remaining() + 1;
+        }
+        ByteBuffer out = ByteBuffer.allocate(totalLength);
+        for (ByteBuffer bb : serialized)
+        {
+            int length = bb.remaining();
+            out.put((byte) ((length >> 8) & 0xFF));
+            out.put((byte) (length & 0xFF));
+            out.put(bb);
+            out.put((byte) 0);
+        }
+        out.flip();
+        return out;
     }
 
     /** write tuple data to cassandra */
@@ -643,6 +837,19 @@ public class CassandraStorage extends 
AbstractCassandraStorage
         }
     }
 
+    /** get a list of columns with defined index*/
+    protected List<ColumnDef> getIndexes() throws IOException
+    {
+        CfDef cfdef = getCfDef(loadSignature);
+        List<ColumnDef> indexes = new ArrayList<ColumnDef>();
+        for (ColumnDef cdef : cfdef.column_metadata)
+        {
+            if (cdef.index_type != null)
+                indexes.add(cdef);
+        }
+        return indexes;
+    }
+
     /** get a list of Cassandra IndexExpression from Pig expression */
     private List<IndexExpression> filterToIndexExpressions(Expression 
expression) throws IOException
     {
@@ -713,13 +920,64 @@ public class CassandraStorage extends 
AbstractCassandraStorage
         return indexClause.getExpressions();
     }
 
+    public ResourceStatistics getStatistics(String location, Job job)
+    {
+        return null;
+    }
+
+    public void cleanupOnFailure(String failure, Job job)
+    {
+    }
+
+    public void cleanupOnSuccess(String location, Job job) throws IOException {
+    }
+
+
+    /** StoreFunc methods */
+    public void setStoreFuncUDFContextSignature(String signature)
+    {
+        this.storeSignature = signature;
+    }
+
+    public String relToAbsPathForStoreLocation(String location, Path curDir) 
throws IOException
+    {
+        return relativeToAbsolutePath(location, curDir);
+    }
+
+    /** output format */
+    public OutputFormat getOutputFormat() throws IOException
+    {
+        try
+        {
+            return FBUtilities.construct(outputFormatClass, "outputformat");
+        }
+        catch (ConfigurationException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
+
+    @Override
+    public InputFormat getInputFormat() throws IOException
+    {
+        try
+        {
+            return FBUtilities.construct(inputFormatClass, "inputformat");
+        }
+        catch (ConfigurationException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
     /** get a list of index expression */
     private List<IndexExpression> getIndexExpressions() throws IOException
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = 
context.getUDFProperties(AbstractCassandraStorage.class);
-        if (property.getProperty(PARTITION_FILTER_SIGNATURE) != null)
-            return 
indexExpressionsFromString(property.getProperty(PARTITION_FILTER_SIGNATURE));
+        Properties property = context.getUDFProperties(CassandraStorage.class);
+        if (property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE) != 
null)
+            return 
indexExpressionsFromString(property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE));
         else
             return null;
     }
@@ -731,6 +989,129 @@ public class CassandraStorage extends 
AbstractCassandraStorage
         return getColumnMeta(client, true, true);
     }
 
+
+    /** get column meta data */
+    protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean 
cassandraStorage, boolean includeCompactValueColumn)
+            throws org.apache.cassandra.thrift.InvalidRequestException,
+            UnavailableException,
+            TimedOutException,
+            SchemaDisagreementException,
+            TException,
+            CharacterCodingException,
+            org.apache.cassandra.exceptions.InvalidRequestException,
+            ConfigurationException,
+            NotFoundException
+    {
+        String query = String.format("SELECT column_name, validator, 
index_type, type " +
+                        "FROM %s.%s " +
+                        "WHERE keyspace_name = '%s' AND columnfamily_name = 
'%s'",
+                SystemKeyspace.NAME,
+                LegacySchemaTables.COLUMNS,
+                keyspace,
+                column_family);
+
+        CqlResult result = 
client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, 
ConsistencyLevel.ONE);
+
+        List<CqlRow> rows = result.rows;
+        List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
+        if (rows == null || rows.isEmpty())
+        {
+            // if CassandraStorage, just return the empty list
+            if (cassandraStorage)
+                return columnDefs;
+
+            // otherwise for CqlNativeStorage, check metadata for classic 
thrift tables
+            CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
+            for (ColumnDefinition def : cfm.regularAndStaticColumns())
+            {
+                ColumnDef cDef = new ColumnDef();
+                String columnName = def.name.toString();
+                String type = def.type.toString();
+                logger.debug("name: {}, type: {} ", columnName, type);
+                cDef.name = ByteBufferUtil.bytes(columnName);
+                cDef.validation_class = type;
+                columnDefs.add(cDef);
+            }
+            // we may not need to include the value column for compact tables 
as we
+            // could have already processed it as 
schema_columnfamilies.value_alias
+            if (columnDefs.size() == 0 && includeCompactValueColumn && 
cfm.compactValueColumn() != null)
+            {
+                ColumnDefinition def = cfm.compactValueColumn();
+                if ("value".equals(def.name.toString()))
+                {
+                    ColumnDef cDef = new ColumnDef();
+                    cDef.name = def.name.bytes;
+                    cDef.validation_class = def.type.toString();
+                    columnDefs.add(cDef);
+                }
+            }
+            return columnDefs;
+        }
+
+        Iterator<CqlRow> iterator = rows.iterator();
+        while (iterator.hasNext())
+        {
+            CqlRow row = iterator.next();
+            ColumnDef cDef = new ColumnDef();
+            String type = ByteBufferUtil.string(row.getColumns().get(3).value);
+            if (!type.equals("regular"))
+                continue;
+            cDef.setName(ByteBufferUtil.clone(row.getColumns().get(0).value));
+            cDef.validation_class = 
ByteBufferUtil.string(row.getColumns().get(1).value);
+            ByteBuffer indexType = row.getColumns().get(2).value;
+            if (indexType != null)
+                cDef.index_type = 
getIndexType(ByteBufferUtil.string(indexType));
+            columnDefs.add(cDef);
+        }
+        return columnDefs;
+    }
+
+
+    /** get CFMetaData of a column family */
+    protected CFMetaData getCFMetaData(String ks, String cf, Cassandra.Client 
client)
+            throws NotFoundException,
+            org.apache.cassandra.thrift.InvalidRequestException,
+            TException,
+            org.apache.cassandra.exceptions.InvalidRequestException,
+            ConfigurationException
+    {
+        KsDef ksDef = client.describe_keyspace(ks);
+        for (CfDef cfDef : ksDef.cf_defs)
+        {
+            if (cfDef.name.equalsIgnoreCase(cf))
+                return ThriftConversion.fromThrift(cfDef);
+        }
+        return null;
+    }
+
+    /** get index type from string */
+    protected IndexType getIndexType(String type)
+    {
+        type = type.toLowerCase();
+        if ("keys".equals(type))
+            return IndexType.KEYS;
+        else if("custom".equals(type))
+            return IndexType.CUSTOM;
+        else if("composites".equals(type))
+            return IndexType.COMPOSITES;
+        else
+            return null;
+    }
+
+    /** return partition keys */
+    public String[] getPartitionKeys(String location, Job job) throws 
IOException
+    {
+        if (!usePartitionFilter)
+            return null;
+        List<ColumnDef> indexes = getIndexes();
+        String[] partitionKeys = new String[indexes.size()];
+        for (int i = 0; i < indexes.size(); i++)
+        {
+            partitionKeys[i] = new String(indexes.get(i).getName());
+        }
+        return partitionKeys;
+    }
+
     /** convert key to a tuple */
     private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType 
comparator) throws IOException
     {
@@ -744,15 +1125,26 @@ public class CassandraStorage extends 
AbstractCassandraStorage
     {
         if( comparator instanceof AbstractCompositeType )
         {
-            setTupleValue(tuple, 0, 
composeComposite((AbstractCompositeType)comparator,key));
+            StorageHelper.setTupleValue(tuple, 0, 
composeComposite((AbstractCompositeType) comparator, key));
         }
         else
         {
-            setTupleValue(tuple, 0, 
cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR), 
key));
+            StorageHelper.setTupleValue(tuple, 0, 
StorageHelper.cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR),
 key, nativeProtocolVersion));
         }
 
     }
 
+    /** Deconstructs a composite type to a Tuple. */
+    protected Tuple composeComposite(AbstractCompositeType comparator, 
ByteBuffer name) throws IOException
+    {
+        List<AbstractCompositeType.CompositeComponent> result = 
comparator.deconstruct(name);
+        Tuple t = TupleFactory.getInstance().newTuple(result.size());
+        for (int i=0; i<result.size(); i++)
+            StorageHelper.setTupleValue(t, i, 
StorageHelper.cassandraToObj(result.get(i).comparator, result.get(i).value, 
nativeProtocolVersion));
+
+        return t;
+    }
+
     /** 
cassandra://[username:password@]<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>
      * [&reversed=true][&limit=1][&allow_deletes=true][&widerows=true]
      * 
[&use_secondary=true][&comparator=<comparator>][&partitioner=<partitioner>]]*/
@@ -817,10 +1209,206 @@ public class CassandraStorage extends 
AbstractCassandraStorage
                     "[&init_address=<host>][&rpc_port=<port>]]': " + 
e.getMessage());
         }
     }
-    
+
+
+    /** decompose the query to store the parameters in a map */
+    public static Map<String, String> getQueryMap(String query) throws 
UnsupportedEncodingException
+    {
+        String[] params = query.split("&");
+        Map<String, String> map = new HashMap<String, String>(params.length);
+        for (String param : params)
+        {
+            String[] keyValue = param.split("=");
+            map.put(keyValue[0], URLDecoder.decode(keyValue[1], "UTF-8"));
+        }
+        return map;
+    }
+
     public ByteBuffer nullToBB()
     {
         return null;
     }
-}
 
+    /** return the CfInfo for the column family */
+    protected CfDef getCfDef(Cassandra.Client client)
+            throws org.apache.cassandra.thrift.InvalidRequestException,
+            UnavailableException,
+            TimedOutException,
+            SchemaDisagreementException,
+            TException,
+            NotFoundException,
+            org.apache.cassandra.exceptions.InvalidRequestException,
+            ConfigurationException,
+            IOException
+    {
+        // get CF meta data
+        String query = String.format("SELECT type, comparator, subcomparator, 
default_validator, key_validator " +
+                        "FROM %s.%s " +
+                        "WHERE keyspace_name = '%s' AND columnfamily_name = 
'%s'",
+                SystemKeyspace.NAME,
+                LegacySchemaTables.COLUMNFAMILIES,
+                keyspace,
+                column_family);
+
+        CqlResult result = 
client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, 
ConsistencyLevel.ONE);
+
+        if (result == null || result.rows == null || result.rows.isEmpty())
+            return null;
+
+        Iterator<CqlRow> iteraRow = result.rows.iterator();
+        CfDef cfDef = new CfDef();
+        cfDef.keyspace = keyspace;
+        cfDef.name = column_family;
+        if (iteraRow.hasNext())
+        {
+            CqlRow cqlRow = iteraRow.next();
+
+            cfDef.column_type = 
ByteBufferUtil.string(cqlRow.columns.get(0).value);
+            cfDef.comparator_type = 
ByteBufferUtil.string(cqlRow.columns.get(1).value);
+            ByteBuffer subComparator = cqlRow.columns.get(2).value;
+            if (subComparator != null)
+                cfDef.subcomparator_type = 
ByteBufferUtil.string(subComparator);
+            cfDef.default_validation_class = 
ByteBufferUtil.string(cqlRow.columns.get(3).value);
+            cfDef.key_validation_class = 
ByteBufferUtil.string(cqlRow.columns.get(4).value);
+        }
+        cfDef.column_metadata = getColumnMetadata(client);
+        return cfDef;
+    }
+
+    /** get the columnfamily definition for the signature */
+    protected CfDef getCfDef(String signature) throws IOException
+    {
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = context.getUDFProperties(CassandraStorage.class);
+        String prop = property.getProperty(signature);
+        return cfdefFromString(prop);
+    }
+
+    /** convert string back to CfDef */
+    protected static CfDef cfdefFromString(String st) throws IOException
+    {
+        assert st != null;
+        TDeserializer deserializer = new TDeserializer(new 
TBinaryProtocol.Factory());
+        CfDef cfDef = new CfDef();
+        try
+        {
+            deserializer.deserialize(cfDef, Hex.hexToBytes(st));
+        }
+        catch (TException e)
+        {
+            throw new IOException(e);
+        }
+        return cfDef;
+    }
+
+    /** convert CfDef to string */
+    protected static String cfdefToString(CfDef cfDef) throws IOException
+    {
+        assert cfDef != null;
+        // this is so awful it's kind of cool!
+        TSerializer serializer = new TSerializer(new 
TBinaryProtocol.Factory());
+        try
+        {
+            return Hex.bytesToHex(serializer.serialize(cfDef));
+        }
+        catch (TException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
+    /** parse the string to a cassandra data type */
+    protected AbstractType parseType(String type) throws IOException
+    {
+        try
+        {
+            // always treat counters like longs, specifically CCT.compose is 
not what we need
+            if (type != null && 
type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
+                return LongType.instance;
+            return TypeParser.parse(type);
+        }
+        catch (ConfigurationException e)
+        {
+            throw new IOException(e);
+        }
+        catch (SyntaxException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
+    /** convert a column to a tuple */
+    protected Tuple columnToTuple(Cell col, CfDef cfDef, AbstractType 
comparator) throws IOException
+    {
+        Tuple pair = TupleFactory.getInstance().newTuple(2);
+
+        ByteBuffer colName = col.name().toByteBuffer();
+
+        // name
+        if(comparator instanceof AbstractCompositeType)
+            StorageHelper.setTupleValue(pair, 0, 
composeComposite((AbstractCompositeType) comparator, colName));
+        else
+            StorageHelper.setTupleValue(pair, 0, 
StorageHelper.cassandraToObj(comparator, colName, nativeProtocolVersion));
+
+        // value
+        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+        if (validators.get(colName) == null)
+        {
+            Map<MarshallerType, AbstractType> marshallers = 
getDefaultMarshallers(cfDef);
+            StorageHelper.setTupleValue(pair, 1, 
StorageHelper.cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), 
col.value(), nativeProtocolVersion));
+        }
+        else
+            StorageHelper.setTupleValue(pair, 1, 
StorageHelper.cassandraToObj(validators.get(colName), col.value(), 
nativeProtocolVersion));
+        return pair;
+    }
+
+    /** construct a map to store the mashaller type to cassandra data type 
mapping */
+    protected Map<MarshallerType, AbstractType> getDefaultMarshallers(CfDef 
cfDef) throws IOException
+    {
+        Map<MarshallerType, AbstractType> marshallers = new 
EnumMap<MarshallerType, AbstractType>(MarshallerType.class);
+        AbstractType comparator;
+        AbstractType subcomparator;
+        AbstractType default_validator;
+        AbstractType key_validator;
+
+        comparator = parseType(cfDef.getComparator_type());
+        subcomparator = parseType(cfDef.getSubcomparator_type());
+        default_validator = parseType(cfDef.getDefault_validation_class());
+        key_validator = parseType(cfDef.getKey_validation_class());
+
+        marshallers.put(MarshallerType.COMPARATOR, comparator);
+        marshallers.put(MarshallerType.DEFAULT_VALIDATOR, default_validator);
+        marshallers.put(MarshallerType.KEY_VALIDATOR, key_validator);
+        marshallers.put(MarshallerType.SUBCOMPARATOR, subcomparator);
+        return marshallers;
+    }
+
+    /** get the validators */
+    protected Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) 
throws IOException
+    {
+        Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, 
AbstractType>();
+        for (ColumnDef cd : cfDef.getColumn_metadata())
+        {
+            if (cd.getValidation_class() != null && 
!cd.getValidation_class().isEmpty())
+            {
+                AbstractType validator = null;
+                try
+                {
+                    validator = TypeParser.parse(cd.getValidation_class());
+                    if (validator instanceof CounterColumnType)
+                        validator = LongType.instance;
+                    validators.put(cd.name, validator);
+                }
+                catch (ConfigurationException e)
+                {
+                    throw new IOException(e);
+                }
+                catch (SyntaxException e)
+                {
+                    throw new IOException(e);
+                }
+            }
+        }
+        return validators;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java 
b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 7887085..91cdd02 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -17,48 +17,78 @@
  */
 package org.apache.cassandra.hadoop.pig;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
 import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import org.apache.cassandra.db.BufferCell;
 import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.exceptions.AuthenticationException;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.hadoop.HadoopCompat;
 import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
 import org.apache.cassandra.hadoop.cql3.CqlRecordReader;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.utils.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.*;
-import org.apache.pig.Expression;
-import org.apache.pig.ResourceSchema;
+import org.apache.pig.*;
 import org.apache.pig.Expression.OpType;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.*;
 import org.apache.pig.impl.util.UDFContext;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.external.biz.base64Coder.Base64Coder;
 
-import com.datastax.driver.core.Row;
 
-public class CqlNativeStorage extends AbstractCassandraStorage
+public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, 
LoadMetadata
 {
+    protected String DEFAULT_INPUT_FORMAT;
+    protected String DEFAULT_OUTPUT_FORMAT;
+
+    protected String username;
+    protected String password;
+    protected String keyspace;
+    protected String column_family;
+    protected String loadSignature;
+    protected String storeSignature;
+
+    protected Configuration conf;
+    protected String inputFormatClass;
+    protected String outputFormatClass;
+    protected int splitSize = 64 * 1024;
+    protected String partitionerClass;
+    protected boolean usePartitionFilter = false;
+    protected String initHostAddress;
+    protected String rpcPort;
+    protected int nativeProtocolVersion = 1;
+
     private static final Logger logger = 
LoggerFactory.getLogger(CqlNativeStorage.class);
     private int pageSize = 1000;
     private String columns;
     private String outputQuery;
     private String whereClause;
-    private boolean hasCompactValueAlias = false;
 
     private RecordReader<Long, Row> reader;
     private RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
@@ -119,21 +149,20 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
             if (!reader.nextKeyValue())
                 return null;
 
-            CfInfo cfInfo = getCfInfo(loadSignature);
-            CfDef cfDef = cfInfo.cfDef;
+            TableInfo tableMetadata = getCfInfo(loadSignature);
             Row row = reader.getCurrentValue();
-            Tuple tuple = 
TupleFactory.getInstance().newTuple(cfDef.column_metadata.size());
-            Iterator<ColumnDef> itera = cfDef.column_metadata.iterator();
+            Tuple tuple = 
TupleFactory.getInstance().newTuple(tableMetadata.getColumns().size());
+            Iterator<ColumnInfo> itera = tableMetadata.getColumns().iterator();
             int i = 0;
             while (itera.hasNext())
             {
-                ColumnDef cdef = itera.next();
-                ByteBuffer columnValue = 
row.getBytesUnsafe(ByteBufferUtil.string(cdef.name.duplicate()));
+                ColumnInfo cdef = itera.next();
+                ByteBuffer columnValue = row.getBytesUnsafe(cdef.getName());
                 if (columnValue != null)
                 {
-                    Cell cell = new 
BufferCell(CellNames.simpleDense(cdef.name), columnValue);
-                    AbstractType<?> validator = 
getValidatorMap(cfDef).get(cdef.name);
-                    setTupleValue(tuple, i, cqlColumnToObj(cell, cfDef), 
validator);
+                    Cell cell = new 
BufferCell(CellNames.simpleDense(ByteBufferUtil.bytes(cdef.getName())), 
columnValue);
+                    AbstractType<?> validator = 
getValidatorMap(tableMetadata).get(ByteBufferUtil.bytes(cdef.getName()));
+                    setTupleValue(tuple, i, cqlColumnToObj(cell, 
tableMetadata), validator);
                 }
                 else
                     tuple.set(i, null);
@@ -148,15 +177,12 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
     }
 
     /** convert a cql column to an object */
-    private Object cqlColumnToObj(Cell col, CfDef cfDef) throws IOException
+    private Object cqlColumnToObj(Cell col, TableInfo cfDef) throws IOException
     {
         // standard
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
         ByteBuffer cellName = col.name().toByteBuffer();
-        if (validators.get(cellName) == null)
-            return 
cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.DEFAULT_VALIDATOR),
 col.value());
-        else
-            return cassandraToObj(validators.get(cellName), col.value());
+        return StorageHelper.cassandraToObj(validators.get(cellName), 
col.value(), nativeProtocolVersion);
     }
 
     /** set the value to the position of the tuple */
@@ -165,7 +191,7 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
         if (validator instanceof CollectionType)
             setCollectionTupleValues(tuple, position, value, validator);
         else
-           setTupleValue(tuple, position, value);
+           StorageHelper.setTupleValue(tuple, position, value);
     }
 
     /** set the values of set/list at and after the position of the tuple */
@@ -220,173 +246,33 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
         return obj;
     }
 
-    /** include key columns */
-    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
-            throws InvalidRequestException,
-            UnavailableException,
-            TimedOutException,
-            SchemaDisagreementException,
-            TException,
-            CharacterCodingException,
-            org.apache.cassandra.exceptions.InvalidRequestException,
-            ConfigurationException,
-            NotFoundException
-    {
-        List<ColumnDef> keyColumns = null;
-        // get key columns
+    /** get the columnfamily definition for the signature */
+    protected TableInfo getCfInfo(String signature) throws IOException
+    {
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = context.getUDFProperties(CqlNativeStorage.class);
+        TableInfo cfInfo;
         try
         {
-            keyColumns = getKeysMeta(client);
+            cfInfo = cfdefFromString(property.getProperty(signature));
         }
-        catch(Exception e)
+        catch (ClassNotFoundException e)
         {
-            logger.error("Error in retrieving key columns" , e);
+            throw new IOException(e);
         }
-
-        // get other columns
-        List<ColumnDef> columns = getColumnMeta(client, false, 
!hasCompactValueAlias);
-
-        // combine all columns in a list
-        if (keyColumns != null && columns != null)
-            keyColumns.addAll(columns);
-
-        return keyColumns;
+        return cfInfo;
     }
 
-    /** get keys meta data */
-    private List<ColumnDef> getKeysMeta(Cassandra.Client client)
-            throws Exception
+    /** return the CfInfo for the column family */
+    protected TableMetadata getCfInfo(Session client)
+            throws NoHostAvailableException,
+            AuthenticationException,
+            IllegalStateException
     {
-        String query = "SELECT key_aliases, " +
-                "       column_aliases, " +
-                "       key_validator, " +
-                "       comparator, " +
-                "       keyspace_name, " +
-                "       value_alias, " +
-                "       default_validator " +
-                "FROM system.schema_columnfamilies " +
-                "WHERE keyspace_name = '%s'" +
-                "  AND columnfamily_name = '%s' ";
-
-        CqlResult result = client.execute_cql3_query(
-                ByteBufferUtil.bytes(String.format(query, keyspace, 
column_family)),
-                Compression.NONE,
-                ConsistencyLevel.ONE);
-
-        if (result == null || result.rows == null || result.rows.isEmpty())
-            return null;
-
-        Iterator<CqlRow> iteraRow = result.rows.iterator();
-        List<ColumnDef> keys = new ArrayList<ColumnDef>();
-        if (iteraRow.hasNext())
-        {
-            CqlRow cqlRow = iteraRow.next();
-            String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
-            logger.debug("Found ksDef name: {}", name);
-            String keyString = 
ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
-
-            logger.debug("partition keys: {}", keyString);
-            List<String> keyNames = FBUtilities.fromJsonList(keyString);
-
-            Iterator<String> iterator = keyNames.iterator();
-            while (iterator.hasNext())
-            {
-                ColumnDef cDef = new ColumnDef();
-                cDef.name = ByteBufferUtil.bytes(iterator.next());
-                keys.add(cDef);
-            }
-            // classic thrift tables
-            if (keys.size() == 0)
-            {
-                CFMetaData cfm = getCFMetaData(keyspace, column_family, 
client);
-                for (ColumnDefinition def : cfm.partitionKeyColumns())
-                {
-                    String key = def.name.toString();
-                    logger.debug("name: {} ", key);
-                    ColumnDef cDef = new ColumnDef();
-                    cDef.name = ByteBufferUtil.bytes(key);
-                    keys.add(cDef);
-                }
-                for (ColumnDefinition def : cfm.clusteringColumns())
-                {
-                    String key = def.name.toString();
-                    logger.debug("name: {} ", key);
-                    ColumnDef cDef = new ColumnDef();
-                    cDef.name = ByteBufferUtil.bytes(key);
-                    keys.add(cDef);
-                }
-            }
-
-            keyString = 
ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
-
-            logger.debug("cluster keys: {}", keyString);
-            keyNames = FBUtilities.fromJsonList(keyString);
-
-            iterator = keyNames.iterator();
-            while (iterator.hasNext())
-            {
-                ColumnDef cDef = new ColumnDef();
-                cDef.name = ByteBufferUtil.bytes(iterator.next());
-                keys.add(cDef);
-            }
-
-            String validator = 
ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
-            logger.debug("row key validator: {}", validator);
-            AbstractType<?> keyValidator = parseType(validator);
-
-            Iterator<ColumnDef> keyItera = keys.iterator();
-            if (keyValidator instanceof CompositeType)
-            {
-                Iterator<AbstractType<?>> typeItera = ((CompositeType) 
keyValidator).types.iterator();
-                while (typeItera.hasNext())
-                    keyItera.next().validation_class = 
typeItera.next().toString();
-            }
-            else
-                keyItera.next().validation_class = keyValidator.toString();
-
-            validator = 
ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
-            logger.debug("cluster key validator: {}", validator);
-
-            if (keyItera.hasNext() && validator != null && 
!validator.isEmpty())
-            {
-                AbstractType<?> clusterKeyValidator = parseType(validator);
-
-                if (clusterKeyValidator instanceof CompositeType)
-                {
-                    Iterator<AbstractType<?>> typeItera = ((CompositeType) 
clusterKeyValidator).types.iterator();
-                    while (keyItera.hasNext())
-                        keyItera.next().validation_class = 
typeItera.next().toString();
-                }
-                else
-                    keyItera.next().validation_class = 
clusterKeyValidator.toString();
-            }
-
-            // compact value_alias column
-            if (cqlRow.columns.get(5).value != null)
-            {
-                try
-                {
-                    String compactValidator = 
ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
-                    logger.debug("default validator: {}", compactValidator);
-                    AbstractType<?> defaultValidator = 
parseType(compactValidator);
-
-                    ColumnDef cDef = new ColumnDef();
-                    cDef.name = cqlRow.columns.get(5).value;
-                    cDef.validation_class = defaultValidator.toString();
-                    keys.add(cDef);
-                    hasCompactValueAlias = true;
-                }
-                catch (Exception e)
-                {
-                    // no compact column at value_alias
-                }
-            }
-
-        }
-        return keys;
+        // get CF meta data
+        return 
client.getCluster().getMetadata().getKeyspace(Metadata.quote(keyspace)).getTable(Metadata.quote(column_family));
     }
 
-
     /** output: (((name, value), (name, value)), (value ... value), 
(value...value)) */
     public void putNext(Tuple t) throws IOException
     {
@@ -441,6 +327,91 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
         return keys;
     }
 
+    /** convert object to ByteBuffer */
+    protected ByteBuffer objToBB(Object o)
+    {
+        if (o == null)
+            return nullToBB();
+        if (o instanceof java.lang.String)
+            return ByteBuffer.wrap(new DataByteArray((String)o).get());
+        if (o instanceof Integer)
+            return Int32Type.instance.decompose((Integer)o);
+        if (o instanceof Long)
+            return LongType.instance.decompose((Long)o);
+        if (o instanceof Float)
+            return FloatType.instance.decompose((Float)o);
+        if (o instanceof Double)
+            return DoubleType.instance.decompose((Double)o);
+        if (o instanceof UUID)
+            return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
+        if(o instanceof Tuple) {
+            List<Object> objects = ((Tuple)o).getAll();
+            //collections
+            if (objects.size() > 0 && objects.get(0) instanceof String)
+            {
+                String collectionType = (String) objects.get(0);
+                if ("set".equalsIgnoreCase(collectionType) ||
+                        "list".equalsIgnoreCase(collectionType))
+                    return objToListOrSetBB(objects.subList(1, 
objects.size()));
+                else if ("map".equalsIgnoreCase(collectionType))
+                    return objToMapBB(objects.subList(1, objects.size()));
+
+            }
+            return objToCompositeBB(objects);
+        }
+
+        return ByteBuffer.wrap(((DataByteArray) o).get());
+    }
+
+    private ByteBuffer objToListOrSetBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new 
ArrayList<ByteBuffer>(objects.size());
+        for(Object sub : objects)
+        {
+            ByteBuffer buffer = objToBB(sub);
+            serialized.add(buffer);
+        }
+        return CollectionSerializer.pack(serialized, objects.size(), 3);
+    }
+
+    private ByteBuffer objToMapBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() 
* 2);
+        for(Object sub : objects)
+        {
+            List<Object> keyValue = ((Tuple)sub).getAll();
+            for (Object entry: keyValue)
+            {
+                ByteBuffer buffer = objToBB(entry);
+                serialized.add(buffer);
+            }
+        }
+        return CollectionSerializer.pack(serialized, objects.size(), 3);
+    }
+
+    private ByteBuffer objToCompositeBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new 
ArrayList<ByteBuffer>(objects.size());
+        int totalLength = 0;
+        for(Object sub : objects)
+        {
+            ByteBuffer buffer = objToBB(sub);
+            serialized.add(buffer);
+            totalLength += 2 + buffer.remaining() + 1;
+        }
+        ByteBuffer out = ByteBuffer.allocate(totalLength);
+        for (ByteBuffer bb : serialized)
+        {
+            int length = bb.remaining();
+            out.put((byte) ((length >> 8) & 0xFF));
+            out.put((byte) (length & 0xFF));
+            out.put(bb);
+            out.put((byte) 0);
+        }
+        out.flip();
+        return out;
+    }
+
     /** send CQL query request using data from tuple */
     private void cqlQueryFromTuple(Map<String, ByteBuffer> key, Tuple t, int 
offset) throws IOException
     {
@@ -487,30 +458,50 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
         }
     }
 
+    /** get the validators */
+    protected Map<ByteBuffer, AbstractType> getValidatorMap(TableInfo cfDef) 
throws IOException
+    {
+        Map<ByteBuffer, AbstractType> validators = new HashMap<>();
+        for (ColumnInfo cd : cfDef.getColumns())
+        {
+            if (cd.getTypeName() != null)
+            {
+                try
+                {
+                    AbstractType validator = 
TypeParser.parseCqlName(cd.getTypeName());
+                    if (validator instanceof CounterColumnType)
+                        validator = LongType.instance;
+                    validators.put(ByteBufferUtil.bytes(cd.getName()), 
validator);
+                }
+                catch (ConfigurationException | SyntaxException e)
+                {
+                    throw new IOException(e);
+                }
+            }
+        }
+        return validators;
+    }
+
     /** schema: (value, value, value) where keys are in the front. */
     public ResourceSchema getSchema(String location, Job job) throws 
IOException
     {
         setLocation(location, job);
-        CfInfo cfInfo = getCfInfo(loadSignature);
-        CfDef cfDef = cfInfo.cfDef;
+        TableInfo cfInfo = getCfInfo(loadSignature);
         // top-level schema, no type
         ResourceSchema schema = new ResourceSchema();
 
-        // get default marshallers and validators
-        Map<MarshallerType, AbstractType> marshallers = 
getDefaultMarshallers(cfDef);
-        Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfDef);
+        // get default validators
+        Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfInfo);
 
         // will contain all fields for this schema
         List<ResourceFieldSchema> allSchemaFields = new 
ArrayList<ResourceFieldSchema>();
 
-        for (ColumnDef cdef : cfDef.column_metadata)
+        for (ColumnInfo cdef : cfInfo.getColumns())
         {
             ResourceFieldSchema valSchema = new ResourceFieldSchema();
-            AbstractType validator = validators.get(cdef.name);
-            if (validator == null)
-                validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
+            AbstractType validator = validators.get(cdef.getName());
             valSchema.setName(new String(cdef.getName()));
-            valSchema.setType(getPigType(validator));
+            valSchema.setType(StorageHelper.getPigType(validator));
             allSchemaFields.add(valSchema);
         }
 
@@ -522,8 +513,8 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
     public void setPartitionFilter(Expression partitionFilter) throws 
IOException
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = 
context.getUDFProperties(AbstractCassandraStorage.class);
-        property.setProperty(PARTITION_FILTER_SIGNATURE, 
partitionFilterToWhereClauseString(partitionFilter));
+        Properties property = context.getUDFProperties(CqlNativeStorage.class);
+        property.setProperty(StorageHelper.PARTITION_FILTER_SIGNATURE, 
partitionFilterToWhereClauseString(partitionFilter));
     }
 
     /**
@@ -557,8 +548,8 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
     private String getWhereClauseForPartitionFilter()
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = 
context.getUDFProperties(AbstractCassandraStorage.class);
-        return property.getProperty(PARTITION_FILTER_SIGNATURE);
+        Properties property = context.getUDFProperties(CqlNativeStorage.class);
+        return property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE);
     }
 
     /** set read configuration settings */
@@ -631,7 +622,7 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
             CqlConfigHelper.setInputWhereClauses(conf, whereClause);
 
         String whereClauseForPartitionFilter = 
getWhereClauseForPartitionFilter();
-        String wc = whereClause != null && !whereClause.trim().isEmpty() 
+        String wc = whereClause != null && !whereClause.trim().isEmpty()
                                ? whereClauseForPartitionFilter == null ? 
whereClause: String.format("%s AND %s", whereClause.trim(), 
whereClauseForPartitionFilter)
                                : whereClauseForPartitionFilter;
 
@@ -639,17 +630,17 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
         {
             logger.debug("where clause: {}", wc);
             CqlConfigHelper.setInputWhereClauses(conf, wc);
-        } 
-        if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
+        }
+        if (System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE) != null)
         {
             try
             {
-                ConfigHelper.setInputSplitSize(conf, 
Integer.parseInt(System.getenv(PIG_INPUT_SPLIT_SIZE)));
+                ConfigHelper.setInputSplitSize(conf, 
Integer.parseInt(System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE)));
             }
             catch (NumberFormatException e)
             {
                 throw new IOException("PIG_INPUT_SPLIT_SIZE is not a number", 
e);
-            }           
+            }
         }
 
         if (ConfigHelper.getInputInitialAddress(conf) == null)
@@ -700,6 +691,74 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
         initSchema(storeSignature);
     }
 
+    /** Methods to get the column family schema from Cassandra */
+    protected void initSchema(String signature) throws IOException
+    {
+        Properties properties = 
UDFContext.getUDFContext().getUDFProperties(CqlNativeStorage.class);
+
+        // Only get the schema if we haven't already gotten it
+        if (!properties.containsKey(signature))
+        {
+            try
+            {
+                Session client = 
CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), 
conf).connect();
+                client.execute("USE " + keyspace);
+
+                // compose the CfDef for the columfamily
+                TableMetadata cfInfo = getCfInfo(client);
+
+                if (cfInfo != null)
+                {
+                    properties.setProperty(signature, cfdefToString(cfInfo));
+                }
+                else
+                    throw new IOException(String.format("Table '%s' not found 
in keyspace '%s'",
+                            column_family,
+                            keyspace));
+            }
+            catch (Exception e)
+            {
+                throw new IOException(e);
+            }
+        }
+    }
+
+
+    /** convert CfDef to string */
+    protected static String cfdefToString(TableMetadata cfDef) throws 
IOException
+    {
+        TableInfo tableInfo = new TableInfo(cfDef);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream( baos );
+        oos.writeObject( tableInfo );
+        oos.close();
+        return new String( Base64Coder.encode(baos.toByteArray()) );
+    }
+
+    /** convert string back to CfDef */
+    protected static TableInfo cfdefFromString(String st) throws IOException, 
ClassNotFoundException
+    {
+        byte [] data = Base64Coder.decode( st );
+        ObjectInputStream ois = new ObjectInputStream(
+                new ByteArrayInputStream(  data ) );
+        Object o  = ois.readObject();
+        ois.close();
+        return (TableInfo)o;
+    }
+
+    /** decompose the query to store the parameters in a map */
+    public static Map<String, String> getQueryMap(String query) throws 
UnsupportedEncodingException
+    {
+        String[] params = query.split("&");
+        Map<String, String> map = new HashMap<String, String>(params.length);
+        for (String param : params)
+        {
+            String[] keyValue = param.split("=");
+            map.put(keyValue[0], URLDecoder.decode(keyValue[1], "UTF-8"));
+        }
+        return map;
+    }
+
     private void setLocationFromUri(String location) throws IOException
     {
         try
@@ -808,11 +867,171 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
         }
     }
 
-    /**
-     * Thrift API can't handle null, so use empty byte array
-     */
     public ByteBuffer nullToBB()
     {
         return ByteBuffer.wrap(new byte[0]);
     }
+
+    /** output format */
+    public OutputFormat getOutputFormat() throws IOException
+    {
+        try
+        {
+            return FBUtilities.construct(outputFormatClass, "outputformat");
+        }
+        catch (ConfigurationException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
+    public void cleanupOnFailure(String failure, Job job)
+    {
+    }
+
+    public void cleanupOnSuccess(String location, Job job) throws IOException {
+    }
+
+    /** return partition keys */
+    public String[] getPartitionKeys(String location, Job job) throws 
IOException
+    {
+        if (!usePartitionFilter)
+            return null;
+        TableInfo tableMetadata = getCfInfo(loadSignature);
+        String[] partitionKeys = new 
String[tableMetadata.getPartitionKey().size()];
+        for (int i = 0; i < tableMetadata.getPartitionKey().size(); i++)
+        {
+            partitionKeys[i] = new 
String(tableMetadata.getPartitionKey().get(i).getName());
+        }
+        return partitionKeys;
+    }
+
+    public void checkSchema(ResourceSchema schema) throws IOException
+    {
+        // we don't care about types, they all get casted to ByteBuffers
+    }
+
+    public ResourceStatistics getStatistics(String location, Job job)
+    {
+        return null;
+    }
+
+    @Override
+    public InputFormat getInputFormat() throws IOException
+    {
+        try
+        {
+            return FBUtilities.construct(inputFormatClass, "inputformat");
+        }
+        catch (ConfigurationException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
+    public String relToAbsPathForStoreLocation(String location, Path curDir) 
throws IOException
+    {
+        return relativeToAbsolutePath(location, curDir);
+    }
+
+    @Override
+    public String relativeToAbsolutePath(String location, Path curDir) throws 
IOException
+    {
+        return location;
+    }
+
+    @Override
+    public void setUDFContextSignature(String signature)
+    {
+        this.loadSignature = signature;
+    }
+
+    /** StoreFunc methods */
+    public void setStoreFuncUDFContextSignature(String signature)
+    {
+        this.storeSignature = signature;
+    }
+
+    /** set hadoop cassandra connection settings */
+    protected void setConnectionInformation() throws IOException
+    {
+        StorageHelper.setConnectionInformation(conf);
+        if (System.getenv(StorageHelper.PIG_INPUT_FORMAT) != null)
+            inputFormatClass = 
getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_INPUT_FORMAT));
+        else
+            inputFormatClass = DEFAULT_INPUT_FORMAT;
+        if (System.getenv(StorageHelper.PIG_OUTPUT_FORMAT) != null)
+            outputFormatClass = 
getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_OUTPUT_FORMAT));
+        else
+            outputFormatClass = DEFAULT_OUTPUT_FORMAT;
+    }
+
+    /** get the full class name */
+    protected String getFullyQualifiedClassName(String classname)
+    {
+        return classname.contains(".") ? classname : 
"org.apache.cassandra.hadoop." + classname;
+    }
+}
+
+class TableInfo implements Serializable
+{
+    private final List<ColumnInfo> columns;
+    private final List<ColumnInfo> partitionKey;
+    private final String name;
+
+    public TableInfo(TableMetadata tableMetadata)
+    {
+        List<ColumnMetadata> cmColumns = tableMetadata.getColumns();
+        columns = new ArrayList<>(cmColumns.size());
+        for (ColumnMetadata cm : cmColumns)
+        {
+            columns.add(new ColumnInfo(this, cm));
+        }
+        List<ColumnMetadata> cmPartitionKey = tableMetadata.getPartitionKey();
+        partitionKey = new ArrayList<>(cmPartitionKey.size());
+        for (ColumnMetadata cm : cmPartitionKey)
+        {
+            partitionKey.add(new ColumnInfo(this, cm));
+        }
+        name = tableMetadata.getName();
+    }
+
+    public List<ColumnInfo> getPartitionKey()
+    {
+        return partitionKey;
+    }
+
+    public List<ColumnInfo> getColumns()
+    {
+        return columns;
+    }
+
+    public String getName()
+    {
+        return name;
+    }
 }
+
+class ColumnInfo implements Serializable
+{
+    private final TableInfo table;
+    private final String name;
+    private final String typeName;
+
+    public ColumnInfo(TableInfo tableInfo, ColumnMetadata columnMetadata)
+    {
+        table = tableInfo;
+        name = columnMetadata.getName();
+        typeName = columnMetadata.getType().toString();
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    public String getTypeName()
+    {
+        return typeName;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java 
b/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java
new file mode 100644
index 0000000..66836b2
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java
@@ -0,0 +1,121 @@
+package org.apache.cassandra.hadoop.pig;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Date;
+import java.util.UUID;
+
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.serializers.CollectionSerializer;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+public class StorageHelper
+{
+    // system environment variables that can be set to configure connection 
info:
+    // alternatively, Hadoop JobConf variables can be set using keys from 
ConfigHelper
+    public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT";
+    public final static String PIG_INPUT_INITIAL_ADDRESS = 
"PIG_INPUT_INITIAL_ADDRESS";
+    public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER";
+    public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT";
+    public final static String PIG_OUTPUT_INITIAL_ADDRESS = 
"PIG_OUTPUT_INITIAL_ADDRESS";
+    public final static String PIG_OUTPUT_PARTITIONER = 
"PIG_OUTPUT_PARTITIONER";
+    public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
+    public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
+    public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
+    public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
+    public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
+    public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE";
+
+
+    public final static String PARTITION_FILTER_SIGNATURE = 
"cassandra.partition.filter";
+
+    protected static void setConnectionInformation(Configuration conf)
+    {
+        if (System.getenv(PIG_RPC_PORT) != null)
+        {
+            ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT));
+            ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT));
+        }
+
+        if (System.getenv(PIG_INPUT_RPC_PORT) != null)
+            ConfigHelper.setInputRpcPort(conf, 
System.getenv(PIG_INPUT_RPC_PORT));
+        if (System.getenv(PIG_OUTPUT_RPC_PORT) != null)
+            ConfigHelper.setOutputRpcPort(conf, 
System.getenv(PIG_OUTPUT_RPC_PORT));
+
+        if (System.getenv(PIG_INITIAL_ADDRESS) != null)
+        {
+            ConfigHelper.setInputInitialAddress(conf, 
System.getenv(PIG_INITIAL_ADDRESS));
+            ConfigHelper.setOutputInitialAddress(conf, 
System.getenv(PIG_INITIAL_ADDRESS));
+        }
+        if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null)
+            ConfigHelper.setInputInitialAddress(conf, 
System.getenv(PIG_INPUT_INITIAL_ADDRESS));
+        if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null)
+            ConfigHelper.setOutputInitialAddress(conf, 
System.getenv(PIG_OUTPUT_INITIAL_ADDRESS));
+
+        if (System.getenv(PIG_PARTITIONER) != null)
+        {
+            ConfigHelper.setInputPartitioner(conf, 
System.getenv(PIG_PARTITIONER));
+            ConfigHelper.setOutputPartitioner(conf, 
System.getenv(PIG_PARTITIONER));
+        }
+        if(System.getenv(PIG_INPUT_PARTITIONER) != null)
+            ConfigHelper.setInputPartitioner(conf, 
System.getenv(PIG_INPUT_PARTITIONER));
+        if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
+            ConfigHelper.setOutputPartitioner(conf, 
System.getenv(PIG_OUTPUT_PARTITIONER));
+    }
+
+    protected static Object cassandraToObj(AbstractType validator, ByteBuffer 
value, int nativeProtocolVersion)
+    {
+        if (validator instanceof DecimalType || validator instanceof 
InetAddressType)
+            return validator.getString(value);
+
+        if (validator instanceof CollectionType)
+        {
+            // For CollectionType, the compose() method assumes the v3 
protocol format of collection, which
+            // is not correct here since we query using the CQL-over-thrift 
interface which use the pre-v3 format
+            return 
((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value,
 nativeProtocolVersion);
+        }
+
+        return validator.compose(value);
+    }
+
+    /** set the value to the position of the tuple */
+    protected static void setTupleValue(Tuple pair, int position, Object 
value) throws ExecException
+    {
+        if (value instanceof BigInteger)
+            pair.set(position, ((BigInteger) value).intValue());
+        else if (value instanceof ByteBuffer)
+            pair.set(position, new 
DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
+        else if (value instanceof UUID)
+            pair.set(position, new 
DataByteArray(UUIDGen.decompose((java.util.UUID) value)));
+        else if (value instanceof Date)
+            pair.set(position, TimestampType.instance.decompose((Date) 
value).getLong());
+        else
+            pair.set(position, value);
+    }
+
+    /** get pig type for the cassandra data type*/
+    protected static byte getPigType(AbstractType type)
+    {
+        if (type instanceof LongType || type instanceof DateType || type 
instanceof TimestampType) // DateType is bad and it should feel bad
+            return DataType.LONG;
+        else if (type instanceof IntegerType || type instanceof Int32Type) // 
IntegerType will overflow at 2**31, but is kept for compatibility until pig has 
a BigInteger
+            return DataType.INTEGER;
+        else if (type instanceof AsciiType || type instanceof UTF8Type || type 
instanceof DecimalType || type instanceof InetAddressType)
+            return DataType.CHARARRAY;
+        else if (type instanceof FloatType)
+            return DataType.FLOAT;
+        else if (type instanceof DoubleType)
+            return DataType.DOUBLE;
+        else if (type instanceof AbstractCompositeType || type instanceof 
CollectionType)
+            return DataType.TUPLE;
+
+        return DataType.BYTEARRAY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 06d83dd..6991958 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -92,7 +92,7 @@ public class SSTableLoader implements StreamEventHandler
                     return false;
                 }
 
-                CFMetaData metadata = client.getCFMetaData(keyspace, 
desc.cfname);
+                CFMetaData metadata = client.getTableMetadata(desc.cfname);
                 if (metadata == null)
                 {
                     outputHandler.output(String.format("Skipping file %s: 
table %s.%s doesn't exist", name, keyspace, desc.cfname));
@@ -251,7 +251,9 @@ public class SSTableLoader implements StreamEventHandler
         /**
          * Stop the client.
          */
-        public void stop() {}
+        public void stop()
+        {
+        }
 
         /**
          * Provides connection factory.
@@ -268,7 +270,12 @@ public class SSTableLoader implements StreamEventHandler
          * Validate that {@code keyspace} is an existing keyspace and {@code
          * cfName} one of its existing column family.
          */
-        public abstract CFMetaData getCFMetaData(String keyspace, String 
cfName);
+        public abstract CFMetaData getTableMetadata(String tableName);
+
+        public void setTableMetadata(CFMetaData cfm)
+        {
+            throw new RuntimeException();
+        }
 
         public Map<InetAddress, Collection<Range<Token>>> 
getEndpointToRangesMap()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index d6ce46e..c17d2d7 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4117,8 +4117,11 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
         SSTableLoader.Client client = new SSTableLoader.Client()
         {
+            private String keyspace;
+
             public void init(String keyspace)
             {
+                this.keyspace = keyspace;
                 try
                 {
                     setPartitioner(DatabaseDescriptor.getPartitioner());
@@ -4135,14 +4138,13 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 }
             }
 
-            public CFMetaData getCFMetaData(String keyspace, String cfName)
+            public CFMetaData getTableMetadata(String tableName)
             {
-                return Schema.instance.getCFMetaData(keyspace, cfName);
+                return Schema.instance.getCFMetaData(keyspace, tableName);
             }
         };
 
-        SSTableLoader loader = new SSTableLoader(dir, client, new 
OutputHandler.LogOutput());
-        return loader.stream();
+        return new SSTableLoader(dir, client, new 
OutputHandler.LogOutput()).stream();
     }
 
     public void rescheduleFailedDeletions()

Reply via email to