Updated Branches: refs/heads/trunk fb5e8801f -> 8fb8343a9
Allow overriding default input/outputformats Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3826 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8fb8343a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8fb8343a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8fb8343a Branch: refs/heads/trunk Commit: 8fb8343a97bd60a3587d7d819f67a77828fda1de Parents: fb5e880 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Thu Feb 2 11:44:03 2012 -0600 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Thu Feb 2 13:04:25 2012 -0600 ---------------------------------------------------------------------- .../cassandra/hadoop/pig/CassandraStorage.java | 42 +++++++++++++-- 1 files changed, 38 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8fb8343a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index af1a11a..827725f 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -26,6 +26,7 @@ import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.IntegerType; import org.apache.cassandra.db.marshal.TypeParser; import org.apache.cassandra.thrift.*; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Hex; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -71,7 +72,12 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo public final static String PIG_RPC_PORT = "PIG_RPC_PORT"; public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS"; public final static String PIG_PARTITIONER = "PIG_PARTITIONER"; + public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT"; + public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT"; + private final static String DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat"; + private final static String DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat"; + private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER; private static final Log logger = LogFactory.getLog(CassandraStorage.class); @@ -86,6 +92,8 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private Configuration conf; private RecordReader reader; private RecordWriter writer; + private String inputFormatClass; + private String outputFormatClass; private int limit; public CassandraStorage() @@ -248,7 +256,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo @Override public InputFormat getInputFormat() { - return new ColumnFamilyInputFormat(); + try + { + return FBUtilities.construct(inputFormatClass, "inputformat"); + } + catch (ConfigurationException e) + { + throw new RuntimeException(e); + } } @Override @@ -330,12 +345,24 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER)); ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER)); - } + } if(System.getenv(PIG_INPUT_PARTITIONER) != null) ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER)); if(System.getenv(PIG_OUTPUT_PARTITIONER) != null) ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER)); - + if (System.getenv(PIG_INPUT_FORMAT) != null) + inputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_INPUT_FORMAT)); + else + inputFormatClass = DEFAULT_INPUT_FORMAT; + if (System.getenv(PIG_OUTPUT_FORMAT) != null) + outputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_OUTPUT_FORMAT)); + else + outputFormatClass = DEFAULT_OUTPUT_FORMAT; + } + + private String getFullyQualifiedClassName(String classname) + { + return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname; } @Override @@ -506,7 +533,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo public OutputFormat getOutputFormat() { - return new ColumnFamilyOutputFormat(); + try + { + return FBUtilities.construct(outputFormatClass, "outputformat"); + } + catch (ConfigurationException e) + { + throw new RuntimeException(e); + } } public void checkSchema(ResourceSchema schema) throws IOException