Author: brandonwilliams Date: Fri Mar 4 00:41:55 2011 New Revision: 1076905
URL: http://svn.apache.org/viewvc?rev=1076905&view=rev Log: Pig storefunc. Patch by brandonwilliams, reviewed by Jeremy Hanna for CASSANDRA-1828. Modified: cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/contrib/pig/README.txt cassandra/branches/cassandra-0.7/contrib/pig/bin/pig_cassandra cassandra/branches/cassandra-0.7/contrib/pig/build.xml cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1076905&r1=1076904&r2=1076905&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Fri Mar 4 00:41:55 2011 @@ -2,6 +2,7 @@ * add nodetool join command (CASSANDRA-2160) * fix secondary indexes on pre-existing or streamed data (CASSANDRA-2244) * initialize endpoing in gossiper earlier (CASSANDRA-2228) + * add ability to write to Cassandra from Pig (CASSANDRA-1828) 0.7.3 * Keep endpoint state until aVeryLongTime (CASSANDRA-2115) Modified: cassandra/branches/cassandra-0.7/contrib/pig/README.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/README.txt?rev=1076905&r1=1076904&r2=1076905&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/pig/README.txt (original) +++ cassandra/branches/cassandra-0.7/contrib/pig/README.txt Fri Mar 4 00:41:55 2011 @@ -1,4 +1,5 @@ -A Pig LoadFunc that reads all columns from a given ColumnFamily. +A Pig storage class that reads all columns from a given ColumnFamily, or writes +properly formatted results into a ColumnFamily. Setup: @@ -7,10 +8,15 @@ configuration and set the PIG_HOME and J variables to the location of a Pig >= 0.7.0 install and your Java install. +NOTE: if you intend to _output_ to Cassandra, until there is a Pig release that +uses jackson > 1.0.1 (see https://issues.apache.org/jira/browse/PIG-1863) you +will need to build Pig yourself with jackson 1.4. To do this, edit Pig's +ivy/libraries.properties, and run ant. + If you would like to run using the Hadoop backend, you should also set PIG_CONF_DIR to the location of your Hadoop config. -FInally, set the following as environment variables (uppercase, +Finally, set the following as environment variables (uppercase, underscored), or as Hadoop configuration variables (lowercase, dotted): * PIG_RPC_PORT or cassandra.thrift.port : the port thrift is listening on * PIG_INITIAL_ADDRESS or cassandra.thrift.address : initial address to connect to @@ -40,3 +46,11 @@ grunt> namecounts = FOREACH namegroups G grunt> orderednames = ORDER namecounts BY $0; grunt> topnames = LIMIT orderednames 50; grunt> dump topnames; + +Outputting to Cassandra requires the same format from input, so the simplest example is: + +grunt> rows = LOAD 'cassandra://Keyspace1/Standard1' USING CassandraStorage(); +grunt> STORE rows into 'cassandra://Keyspace1/Standard2' USING CassandraStorage(); + +Which will copy the ColumnFamily. Note that the destination ColumnFamily must +already exist for this to work. Modified: cassandra/branches/cassandra-0.7/contrib/pig/bin/pig_cassandra URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/bin/pig_cassandra?rev=1076905&r1=1076904&r2=1076905&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/pig/bin/pig_cassandra (original) +++ cassandra/branches/cassandra-0.7/contrib/pig/bin/pig_cassandra Fri Mar 4 00:41:55 2011 @@ -38,7 +38,7 @@ if [ "x$PIG_HOME" = "x" ]; then fi # pig jar. -PIG_JAR=$PIG_HOME/pig*core.jar +PIG_JAR=$PIG_HOME/pig*.jar if [ ! -e $PIG_JAR ]; then echo "Unable to locate Pig jar" >&2 exit 1 Modified: cassandra/branches/cassandra-0.7/contrib/pig/build.xml URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/build.xml?rev=1076905&r1=1076904&r2=1076905&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/pig/build.xml (original) +++ cassandra/branches/cassandra-0.7/contrib/pig/build.xml Fri Mar 4 00:41:55 2011 @@ -21,7 +21,7 @@ <!-- stores the environment for locating PIG_HOME --> <property environment="env" /> <property name="cassandra.dir" value="../.." /> - <property name="cassandra.lib" value="" /> + <property name="cassandra.lib" value="${cassandra.dir}/lib" /> <property name="cassandra.classes" value="${cassandra.dir}/build/classes" /> <property name="build.src" value="${basedir}/src" /> <property name="build.lib" value="${basedir}/lib" /> @@ -31,8 +31,9 @@ <path id="pig.classpath"> <fileset file="${env.PIG_HOME}/pig*.jar" /> - <fileset dir="${cassandra.dir}/lib"> + <fileset dir="${cassandra.lib}"> <include name="libthrift*.jar" /> + <include name="avro*.jar" /> </fileset> <fileset dir="${cassandra.dir}/build/lib/jars"> <include name="google-collections*.jar" /> Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1076905&r1=1076904&r2=1076905&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original) +++ cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Fri Mar 4 00:41:55 2011 @@ -20,33 +20,36 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.cassandra.db.Column; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.SuperColumn; import org.apache.cassandra.hadoop.*; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.SliceRange; +import org.apache.cassandra.avro.Mutation; +import org.apache.cassandra.avro.Deletion; +import org.apache.cassandra.avro.ColumnOrSuperColumn; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.*; -import org.apache.pig.LoadFunc; +import org.apache.pig.*; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; -import org.apache.pig.data.DefaultDataBag; -import org.apache.pig.data.DataByteArray; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; +import org.apache.pig.data.*; +import org.apache.pig.impl.logicalLayer.FrontendException; /** * A LoadFunc wrapping ColumnFamilyInputFormat. * * A row from a standard CF will be returned as nested tuples: (key, ((name1, val1), (name2, val2))). */ -public class CassandraStorage extends LoadFunc +public class CassandraStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown { // system environment variables that can be set to configure connection info: // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper @@ -56,9 +59,11 @@ public class CassandraStorage extends Lo private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER; private final static int LIMIT = 1024; + private static final Log logger = LogFactory.getLog(CassandraStorage.class); private Configuration conf; private RecordReader reader; + private RecordWriter writer; @Override public Tuple getNext() throws IOException @@ -116,8 +121,7 @@ public class CassandraStorage extends Lo @Override public InputFormat getInputFormat() { - ColumnFamilyInputFormat inputFormat = new ColumnFamilyInputFormat(); - return inputFormat; + return new ColumnFamilyInputFormat(); } @Override @@ -126,38 +130,50 @@ public class CassandraStorage extends Lo this.reader = reader; } - @Override - public void setLocation(String location, Job job) throws IOException + private String[] parseLocation(String location) throws IOException { // parse uri into keyspace and columnfamily - String ksname, cfname; + String names[]; try { if (!location.startsWith("cassandra://")) throw new Exception("Bad scheme."); String[] parts = location.split("/+"); - ksname = parts[1]; - cfname = parts[2]; + names = new String[]{ parts[1], parts[2] }; } catch (Exception e) { throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>': " + e.getMessage()); } + return names; + } - // and configure - SliceRange range = new SliceRange(BOUND, BOUND, false, LIMIT); - SlicePredicate predicate = new SlicePredicate().setSlice_range(range); - conf = job.getConfiguration(); - ConfigHelper.setInputSlicePredicate(conf, predicate); - ConfigHelper.setInputColumnFamily(conf, ksname, cfname); - - // check the environment for connection information + private void setConnectionInformation() throws IOException + { if (System.getenv(PIG_RPC_PORT) != null) ConfigHelper.setRpcPort(conf, System.getenv(PIG_RPC_PORT)); + else + throw new IOException("PIG_RPC_PORT environment variable not set"); if (System.getenv(PIG_INITIAL_ADDRESS) != null) ConfigHelper.setInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS)); + else + throw new IOException("PIG_INITIAL_ADDRESS environment variable not set"); if (System.getenv(PIG_PARTITIONER) != null) ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER)); + else + throw new IOException("PIG_PARTITIONER environment variable not set"); + } + + @Override + public void setLocation(String location, Job job) throws IOException + { + SliceRange range = new SliceRange(BOUND, BOUND, false, LIMIT); + SlicePredicate predicate = new SlicePredicate().setSlice_range(range); + conf = job.getConfiguration(); + ConfigHelper.setInputSlicePredicate(conf, predicate); + String[] names = parseLocation(location); + ConfigHelper.setInputColumnFamily(conf, names[0], names[1]); + setConnectionInformation(); } @Override @@ -165,4 +181,136 @@ public class CassandraStorage extends Lo { return location; } + + /* StoreFunc methods */ + public void setStoreFuncUDFContextSignature(String signature) + { + } + + public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException + { + return relativeToAbsolutePath(location, curDir); + } + + public void setStoreLocation(String location, Job job) throws IOException + { + conf = job.getConfiguration(); + String[] names = parseLocation(location); + ConfigHelper.setOutputColumnFamily(conf, names[0], names[1]); + setConnectionInformation(); + } + + public OutputFormat getOutputFormat() + { + return new ColumnFamilyOutputFormat(); + } + + public void checkSchema(ResourceSchema schema) throws IOException + { + // we don't care about types, they all get casted to ByteBuffers + } + + public void prepareToWrite(RecordWriter writer) + { + this.writer = writer; + } + + private ByteBuffer objToBB(Object o) + { + if (o == null) + return (ByteBuffer)o; + if (o instanceof java.lang.String) + o = new DataByteArray((String)o); + return ByteBuffer.wrap(((DataByteArray) o).get()); + } + + public void putNext(Tuple t) throws ExecException, IOException + { + ByteBuffer key = objToBB(t.get(0)); + DefaultDataBag pairs = (DefaultDataBag) t.get(1); + ArrayList<Mutation> mutationList = new ArrayList<Mutation>(); + + try + { + for (Tuple pair : pairs) + { + Mutation mutation = new Mutation(); + if (DataType.findType(pair.get(1)) == DataType.BAG) // supercolumn + { + org.apache.cassandra.avro.SuperColumn sc = new org.apache.cassandra.avro.SuperColumn(); + sc.name = objToBB(pair.get(0)); + ArrayList<org.apache.cassandra.avro.Column> columns = new ArrayList<org.apache.cassandra.avro.Column>(); + for (Tuple subcol : (DefaultDataBag) pair.get(1)) + { + org.apache.cassandra.avro.Column column = new org.apache.cassandra.avro.Column(); + column.name = objToBB(subcol.get(0)); + column.value = objToBB(subcol.get(1)); + column.timestamp = System.currentTimeMillis() * 1000; + columns.add(column); + } + if (columns.isEmpty()) // a deletion + { + mutation.deletion = new Deletion(); + mutation.deletion.super_column = objToBB(pair.get(0)); + mutation.deletion.timestamp = System.currentTimeMillis() * 1000; + } + else + { + sc.columns = columns; + mutation.column_or_supercolumn = new ColumnOrSuperColumn(); + mutation.column_or_supercolumn.super_column = sc; + } + } + else // assume column since it could be anything else + { + if (pair.get(1) == null) + { + mutation.deletion = new Deletion(); + mutation.deletion.predicate = new org.apache.cassandra.avro.SlicePredicate(); + mutation.deletion.predicate.column_names = Arrays.asList(objToBB(pair.get(0))); + mutation.deletion.timestamp = System.currentTimeMillis() * 1000; + } + else + { + org.apache.cassandra.avro.Column column = new org.apache.cassandra.avro.Column(); + column.name = objToBB(pair.get(0)); + column.value = objToBB(pair.get(1)); + column.timestamp = System.currentTimeMillis() * 1000; + mutation.column_or_supercolumn = new ColumnOrSuperColumn(); + mutation.column_or_supercolumn.column = column; + mutationList.add(mutation); + } + } + mutationList.add(mutation); + } + } + catch (ClassCastException e) + { + throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily"); + } + try + { + writer.write(key, mutationList); + } + catch (InterruptedException e) + { + throw new IOException(e); + } + } + + public void cleanupOnFailure(String failure, Job job) + { + } + + /* LoadPushDown methods */ + + public List<OperatorSet> getFeatures() { + return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION); + } + + public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException + { + return new RequiredFieldResponse(true); + } + }