Author: brandonwilliams
Date: Fri Mar  4 00:41:55 2011
New Revision: 1076905

URL: http://svn.apache.org/viewvc?rev=1076905&view=rev
Log:
Pig storefunc.
Patch by brandonwilliams, reviewed by Jeremy Hanna for CASSANDRA-1828.

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/contrib/pig/README.txt
    cassandra/branches/cassandra-0.7/contrib/pig/bin/pig_cassandra
    cassandra/branches/cassandra-0.7/contrib/pig/build.xml
    
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1076905&r1=1076904&r2=1076905&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Fri Mar  4 00:41:55 2011
@@ -2,6 +2,7 @@
  * add nodetool join command (CASSANDRA-2160)
  * fix secondary indexes on pre-existing or streamed data (CASSANDRA-2244)
  * initialize endpoing in gossiper earlier (CASSANDRA-2228)
+ * add ability to write to Cassandra from Pig (CASSANDRA-1828)
 
 0.7.3
  * Keep endpoint state until aVeryLongTime (CASSANDRA-2115)

Modified: cassandra/branches/cassandra-0.7/contrib/pig/README.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/README.txt?rev=1076905&r1=1076904&r2=1076905&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/pig/README.txt (original)
+++ cassandra/branches/cassandra-0.7/contrib/pig/README.txt Fri Mar  4 00:41:55 
2011
@@ -1,4 +1,5 @@
-A Pig LoadFunc that reads all columns from a given ColumnFamily.
+A Pig storage class that reads all columns from a given ColumnFamily, or writes
+properly formatted results into a ColumnFamily.
 
 Setup:
 
@@ -7,10 +8,15 @@ configuration and set the PIG_HOME and J
 variables to the location of a Pig >= 0.7.0 install and your Java
 install. 
 
+NOTE: if you intend to _output_ to Cassandra, until there is a Pig release that
+uses jackson > 1.0.1 (see https://issues.apache.org/jira/browse/PIG-1863) you
+will need to build Pig yourself with jackson 1.4.  To do this, edit Pig's
+ivy/libraries.properties, and run ant.
+
 If you would like to run using the Hadoop backend, you should
 also set PIG_CONF_DIR to the location of your Hadoop config.
 
-FInally, set the following as environment variables (uppercase,
+Finally, set the following as environment variables (uppercase,
 underscored), or as Hadoop configuration variables (lowercase, dotted):
 * PIG_RPC_PORT or cassandra.thrift.port : the port thrift is listening on 
 * PIG_INITIAL_ADDRESS or cassandra.thrift.address : initial address to connect 
to
@@ -40,3 +46,11 @@ grunt> namecounts = FOREACH namegroups G
 grunt> orderednames = ORDER namecounts BY $0;
 grunt> topnames = LIMIT orderednames 50;
 grunt> dump topnames;
+
+Outputting to Cassandra requires the same format from input, so the simplest 
example is:
+
+grunt> rows = LOAD 'cassandra://Keyspace1/Standard1' USING CassandraStorage();
+grunt> STORE rows into 'cassandra://Keyspace1/Standard2' USING 
CassandraStorage();
+
+Which will copy the ColumnFamily.  Note that the destination ColumnFamily must
+already exist for this to work.

Modified: cassandra/branches/cassandra-0.7/contrib/pig/bin/pig_cassandra
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/bin/pig_cassandra?rev=1076905&r1=1076904&r2=1076905&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/pig/bin/pig_cassandra (original)
+++ cassandra/branches/cassandra-0.7/contrib/pig/bin/pig_cassandra Fri Mar  4 
00:41:55 2011
@@ -38,7 +38,7 @@ if [ "x$PIG_HOME" = "x" ]; then
 fi
 
 # pig jar.
-PIG_JAR=$PIG_HOME/pig*core.jar
+PIG_JAR=$PIG_HOME/pig*.jar
 if [ ! -e $PIG_JAR ]; then
     echo "Unable to locate Pig jar" >&2
     exit 1

Modified: cassandra/branches/cassandra-0.7/contrib/pig/build.xml
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/build.xml?rev=1076905&r1=1076904&r2=1076905&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/pig/build.xml (original)
+++ cassandra/branches/cassandra-0.7/contrib/pig/build.xml Fri Mar  4 00:41:55 
2011
@@ -21,7 +21,7 @@
     <!-- stores the environment for locating PIG_HOME -->
     <property environment="env" />
     <property name="cassandra.dir" value="../.." />
-    <property name="cassandra.lib" value="" />
+    <property name="cassandra.lib" value="${cassandra.dir}/lib" />
     <property name="cassandra.classes" value="${cassandra.dir}/build/classes" 
/>
     <property name="build.src" value="${basedir}/src" />
     <property name="build.lib" value="${basedir}/lib" />
@@ -31,8 +31,9 @@
 
     <path id="pig.classpath">
         <fileset file="${env.PIG_HOME}/pig*.jar" />
-        <fileset dir="${cassandra.dir}/lib">
+        <fileset dir="${cassandra.lib}">
             <include name="libthrift*.jar" />
+            <include name="avro*.jar" />
         </fileset>
         <fileset dir="${cassandra.dir}/build/lib/jars">
             <include name="google-collections*.jar" />

Modified: 
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1076905&r1=1076904&r2=1076905&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
 Fri Mar  4 00:41:55 2011
@@ -20,33 +20,36 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.SuperColumn;
 import org.apache.cassandra.hadoop.*;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.avro.Mutation;
+import org.apache.cassandra.avro.Deletion;
+import org.apache.cassandra.avro.ColumnOrSuperColumn;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.*;
 
-import org.apache.pig.LoadFunc;
+import org.apache.pig.*;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.DefaultDataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 
 /**
  * A LoadFunc wrapping ColumnFamilyInputFormat.
  *
  * A row from a standard CF will be returned as nested tuples: (key, ((name1, 
val1), (name2, val2))).
  */
-public class CassandraStorage extends LoadFunc
+public class CassandraStorage extends LoadFunc implements StoreFuncInterface, 
LoadPushDown
 {
     // system environment variables that can be set to configure connection 
info:
     // alternatively, Hadoop JobConf variables can be set using keys from 
ConfigHelper
@@ -56,9 +59,11 @@ public class CassandraStorage extends Lo
 
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
     private final static int LIMIT = 1024;
+    private static final Log logger = 
LogFactory.getLog(CassandraStorage.class);
 
     private Configuration conf;
     private RecordReader reader;
+    private RecordWriter writer;
 
     @Override
     public Tuple getNext() throws IOException
@@ -116,8 +121,7 @@ public class CassandraStorage extends Lo
     @Override
     public InputFormat getInputFormat()
     {
-        ColumnFamilyInputFormat inputFormat = new ColumnFamilyInputFormat();
-        return inputFormat;
+        return new ColumnFamilyInputFormat();
     }
 
     @Override
@@ -126,38 +130,50 @@ public class CassandraStorage extends Lo
         this.reader = reader;
     }
 
-    @Override
-    public void setLocation(String location, Job job) throws IOException
+    private String[] parseLocation(String location) throws IOException
     {
         // parse uri into keyspace and columnfamily
-        String ksname, cfname;
+        String names[];
         try
         {
             if (!location.startsWith("cassandra://"))
                 throw new Exception("Bad scheme.");
             String[] parts = location.split("/+");
-            ksname = parts[1];
-            cfname = parts[2];
+            names = new String[]{ parts[1], parts[2] };
         }
         catch (Exception e)
         {
             throw new IOException("Expected 
'cassandra://<keyspace>/<columnfamily>': " + e.getMessage());
         }
+       return names;
+    }
 
-        // and configure
-        SliceRange range = new SliceRange(BOUND, BOUND, false, LIMIT);
-        SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
-        conf = job.getConfiguration();
-        ConfigHelper.setInputSlicePredicate(conf, predicate);
-        ConfigHelper.setInputColumnFamily(conf, ksname, cfname);
-
-        // check the environment for connection information
+    private void setConnectionInformation() throws IOException
+    {
         if (System.getenv(PIG_RPC_PORT) != null)
             ConfigHelper.setRpcPort(conf, System.getenv(PIG_RPC_PORT));
+        else
+            throw new IOException("PIG_RPC_PORT environment variable not set");
         if (System.getenv(PIG_INITIAL_ADDRESS) != null)
             ConfigHelper.setInitialAddress(conf, 
System.getenv(PIG_INITIAL_ADDRESS));
+        else
+            throw new IOException("PIG_INITIAL_ADDRESS environment variable 
not set");
         if (System.getenv(PIG_PARTITIONER) != null)
             ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER));
+        else
+            throw new IOException("PIG_PARTITIONER environment variable not 
set");
+    }
+
+    @Override
+    public void setLocation(String location, Job job) throws IOException
+    {
+        SliceRange range = new SliceRange(BOUND, BOUND, false, LIMIT);
+        SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
+        conf = job.getConfiguration();
+        ConfigHelper.setInputSlicePredicate(conf, predicate);
+        String[] names = parseLocation(location);
+        ConfigHelper.setInputColumnFamily(conf, names[0], names[1]);
+        setConnectionInformation();
     }
 
     @Override
@@ -165,4 +181,136 @@ public class CassandraStorage extends Lo
     {
         return location;
     }
+
+    /* StoreFunc methods */
+    public void setStoreFuncUDFContextSignature(String signature)
+    {
+    }
+
+    public String relToAbsPathForStoreLocation(String location, Path curDir) 
throws IOException
+    {
+        return relativeToAbsolutePath(location, curDir);
+    }
+
+    public void setStoreLocation(String location, Job job) throws IOException
+    {
+        conf = job.getConfiguration();
+        String[] names = parseLocation(location);
+        ConfigHelper.setOutputColumnFamily(conf, names[0], names[1]);
+        setConnectionInformation();
+    }
+
+    public OutputFormat getOutputFormat()
+    {
+        return new ColumnFamilyOutputFormat();
+    }
+
+    public void checkSchema(ResourceSchema schema) throws IOException
+    {
+        // we don't care about types, they all get casted to ByteBuffers
+    }
+
+    public void prepareToWrite(RecordWriter writer)
+    {
+        this.writer = writer;
+    }
+
+    private ByteBuffer objToBB(Object o)
+    {
+        if (o == null)
+            return (ByteBuffer)o;
+        if (o instanceof java.lang.String)
+            o = new DataByteArray((String)o);
+        return ByteBuffer.wrap(((DataByteArray) o).get());
+    }
+
+    public void putNext(Tuple t) throws ExecException, IOException
+    {
+        ByteBuffer key = objToBB(t.get(0));
+        DefaultDataBag pairs = (DefaultDataBag) t.get(1);
+        ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
+
+        try
+        {
+            for (Tuple pair : pairs)
+            {
+               Mutation mutation = new Mutation();
+               if (DataType.findType(pair.get(1)) == DataType.BAG) // 
supercolumn
+               {
+                   org.apache.cassandra.avro.SuperColumn sc = new 
org.apache.cassandra.avro.SuperColumn();
+                   sc.name = objToBB(pair.get(0));
+                   ArrayList<org.apache.cassandra.avro.Column> columns = new 
ArrayList<org.apache.cassandra.avro.Column>();
+                   for (Tuple subcol : (DefaultDataBag) pair.get(1))
+                   {
+                       org.apache.cassandra.avro.Column column = new 
org.apache.cassandra.avro.Column();
+                       column.name = objToBB(subcol.get(0));
+                       column.value = objToBB(subcol.get(1));
+                       column.timestamp = System.currentTimeMillis() * 1000;
+                       columns.add(column);
+                   }
+                   if (columns.isEmpty()) // a deletion
+                   {
+                       mutation.deletion = new Deletion();
+                       mutation.deletion.super_column = objToBB(pair.get(0));
+                       mutation.deletion.timestamp = 
System.currentTimeMillis() * 1000;
+                   }
+                   else
+                   {
+                       sc.columns = columns;
+                       mutation.column_or_supercolumn = new 
ColumnOrSuperColumn();
+                       mutation.column_or_supercolumn.super_column = sc;
+                   }
+               }
+               else // assume column since it could be anything else
+               {
+                   if (pair.get(1) == null)
+                   {
+                       mutation.deletion = new Deletion();
+                       mutation.deletion.predicate = new 
org.apache.cassandra.avro.SlicePredicate();
+                       mutation.deletion.predicate.column_names = 
Arrays.asList(objToBB(pair.get(0)));
+                       mutation.deletion.timestamp = 
System.currentTimeMillis() * 1000;
+                   }
+                   else
+                   {
+                       org.apache.cassandra.avro.Column column = new 
org.apache.cassandra.avro.Column();
+                       column.name = objToBB(pair.get(0));
+                       column.value = objToBB(pair.get(1));
+                       column.timestamp = System.currentTimeMillis() * 1000;
+                       mutation.column_or_supercolumn = new 
ColumnOrSuperColumn();
+                       mutation.column_or_supercolumn.column = column;
+                       mutationList.add(mutation);
+                   }
+               }
+               mutationList.add(mutation);
+            }
+        }
+        catch (ClassCastException e)
+        {
+            throw new IOException(e + " Output must be (key, 
{(column,value)...}) for ColumnFamily or (key, 
{supercolumn:{(column,value)...}...}) for SuperColumnFamily");
+        }
+        try
+        {
+            writer.write(key, mutationList);
+        }
+        catch (InterruptedException e)
+        {
+           throw new IOException(e);
+        }
+    }
+
+    public void cleanupOnFailure(String failure, Job job)
+    {
+    }
+
+    /* LoadPushDown methods */
+
+    public List<OperatorSet> getFeatures() {
+        return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+    }
+
+    public RequiredFieldResponse pushProjection(RequiredFieldList 
requiredFieldList) throws FrontendException
+    {
+        return new RequiredFieldResponse(true);
+    }
+
 }


Reply via email to